diff --git a/datahub-web-react/src/app/ingest/secret/SecretsList.tsx b/datahub-web-react/src/app/ingest/secret/SecretsList.tsx index cd5a4c2a0af85..ee695545bafa2 100644 --- a/datahub-web-react/src/app/ingest/secret/SecretsList.tsx +++ b/datahub-web-react/src/app/ingest/secret/SecretsList.tsx @@ -174,7 +174,7 @@ export const SecretsList = () => { ); setTimeout(() => { refetch(); - }, 2000); + }, 3000); }) .catch((e) => { message.destroy(); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 1d4581218e09a..20273550c83dc 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -968,8 +968,9 @@ private IngestAspectsResult ingestAspectsToLocalDB( // lock) // Initial database state from database - Map> batchAspects = + final Map> batchAspects = aspectDao.getLatestAspects(opContext, urnAspects, true); + final Map> updatedLatestAspects; // read #2 (potentially) final Map> nextVersions = @@ -989,7 +990,6 @@ private IngestAspectsResult ingestAspectsToLocalDB( // These items are new items from side effects Map> sideEffects = updatedItems.getFirst(); - final Map> updatedLatestAspects; final Map> updatedNextVersions; Map> newLatestAspects = @@ -1024,6 +1024,7 @@ private IngestAspectsResult ingestAspectsToLocalDB( .collect(Collectors.toList()); } else { changeMCPs = updatedItems.getSecond(); + updatedLatestAspects = batchAspects; } // No changes, return @@ -1080,7 +1081,7 @@ private IngestAspectsResult ingestAspectsToLocalDB( Latest aspect after possible in-memory mutation */ final SystemAspect latestAspect = - batchAspects + updatedLatestAspects .getOrDefault(writeItem.getUrn().toString(), Map.of()) .get(writeItem.getAspectName()); @@ -1145,8 +1146,9 @@ This condition is specifically for an older conditional write ingestAspectIfNotP // Only consider retention when there was a previous version .filter( result -> - batchAspects.containsKey(result.getUrn().toString()) - && batchAspects + updatedLatestAspects.containsKey( + result.getUrn().toString()) + && updatedLatestAspects .get(result.getUrn().toString()) .containsKey( result.getRequest().getAspectName())) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/EntityVersioningServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/EntityVersioningServiceImpl.java index ced4b8a65b976..8167fa278e58e 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/EntityVersioningServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/EntityVersioningServiceImpl.java @@ -76,10 +76,8 @@ public List linkLatestVersion( Urn versionSet, Urn newLatestVersion, VersionPropertiesInput inputProperties) { - List ingestResults = new ArrayList<>(); AspectRetriever aspectRetriever = opContext.getAspectRetriever(); String sortId; - Long versionSetConstraint; Long versionPropertiesConstraint = -1L; VersionSetKey versionSetKey = (VersionSetKey) @@ -93,36 +91,26 @@ public List linkLatestVersion( + newLatestVersion.getEntityType()); } if (!aspectRetriever.entityExists(ImmutableSet.of(versionSet)).get(versionSet)) { - MetadataChangeProposal versionSetKeyProposal = new MetadataChangeProposal(); - versionSetKeyProposal.setEntityUrn(versionSet); - versionSetKeyProposal.setEntityType(VERSION_SET_ENTITY_NAME); - versionSetKeyProposal.setAspectName(VERSION_SET_KEY_ASPECT_NAME); - versionSetKeyProposal.setAspect(GenericRecordUtils.serializeAspect(versionSetKey)); - versionSetKeyProposal.setChangeType(ChangeType.CREATE_ENTITY); - ingestResults.add( - entityService.ingestProposal( - opContext, versionSetKeyProposal, opContext.getAuditStamp(), false)); - sortId = INITIAL_VERSION_SORT_ID; - versionSetConstraint = -1L; } else { SystemAspect versionSetPropertiesAspect = aspectRetriever.getLatestSystemAspect(versionSet, VERSION_SET_PROPERTIES_ASPECT_NAME); VersionSetProperties versionSetProperties = RecordUtils.toRecordTemplate( VersionSetProperties.class, versionSetPropertiesAspect.getRecordTemplate().data()); - versionSetConstraint = - versionSetPropertiesAspect - .getSystemMetadataVersion() - .orElse(versionSetPropertiesAspect.getVersion()); + + if (versionSetProperties.getVersioningScheme() + != VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB) { + throw new IllegalArgumentException( + "Only versioning scheme supported is ALPHANUMERIC_GENERATED_BY_DATAHUB"); + } + SystemAspect latestVersion = aspectRetriever.getLatestSystemAspect( versionSetProperties.getLatest(), VERSION_PROPERTIES_ASPECT_NAME); VersionProperties latestVersionProperties = RecordUtils.toRecordTemplate( VersionProperties.class, latestVersion.getRecordTemplate().data()); - // When more impls for versioning scheme are set up, this will need to be resolved to the - // correct scheme generation strategy sortId = AlphanumericSortIdGenerator.increment(latestVersionProperties.getSortId()); } @@ -154,9 +142,9 @@ public List linkLatestVersion( .setComment(inputProperties.getComment(), SetMode.IGNORE_NULL) .setVersion(versionTag) .setMetadataCreatedTimestamp(opContext.getAuditStamp()) - .setSortId(sortId); + .setSortId(sortId) + .setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB); if (inputProperties.getSourceCreationTimestamp() != null) { - AuditStamp sourceCreatedAuditStamp = new AuditStamp().setTime(inputProperties.getSourceCreationTimestamp()); Urn actor = null; @@ -182,36 +170,11 @@ public List linkLatestVersion( headerMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionPropertiesConstraint.toString()); versionPropertiesProposal.setChangeType(ChangeType.UPSERT); versionPropertiesProposal.setHeaders(headerMap); - ingestResults.add( - entityService.ingestProposal( - opContext, versionPropertiesProposal, opContext.getAuditStamp(), false)); - // Might want to refactor this to a Patch w/ Create if not exists logic if more properties get - // added - // to Version Set Properties - VersionSetProperties versionSetProperties = - new VersionSetProperties() - .setVersioningScheme( - VersioningScheme - .ALPHANUMERIC_GENERATED_BY_DATAHUB) // Only one available, will need to add to - // input properties once more are added. - .setLatest(newLatestVersion); - MetadataChangeProposal versionSetPropertiesProposal = new MetadataChangeProposal(); - versionSetPropertiesProposal.setEntityUrn(versionSet); - versionSetPropertiesProposal.setEntityType(VERSION_SET_ENTITY_NAME); - versionSetPropertiesProposal.setAspectName(VERSION_SET_PROPERTIES_ASPECT_NAME); - versionSetPropertiesProposal.setAspect( - GenericRecordUtils.serializeAspect(versionSetProperties)); - versionSetPropertiesProposal.setChangeType(ChangeType.UPSERT); - StringMap versionSetHeaderMap = new StringMap(); - versionSetHeaderMap.put(HTTP_HEADER_IF_VERSION_MATCH, versionSetConstraint.toString()); - versionSetPropertiesProposal.setHeaders(versionSetHeaderMap); - versionSetPropertiesProposal.setSystemMetadata(systemMetadata); - ingestResults.add( + IngestResult result = entityService.ingestProposal( - opContext, versionSetPropertiesProposal, opContext.getAuditStamp(), false)); - - return ingestResults; + opContext, versionPropertiesProposal, opContext.getAuditStamp(), false); + return result != null ? List.of(result) : List.of(); } /** diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/sideeffects/VersionPropertiesSideEffect.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/sideeffects/VersionPropertiesSideEffect.java new file mode 100644 index 0000000000000..328a605ce43bf --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/sideeffects/VersionPropertiesSideEffect.java @@ -0,0 +1,204 @@ +package com.linkedin.metadata.entity.versioning.sideeffects; + +import static com.linkedin.metadata.Constants.*; + +import com.datahub.util.RecordUtils; +import com.linkedin.common.VersionProperties; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.entity.Aspect; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.RetrieverContext; +import com.linkedin.metadata.aspect.batch.ChangeMCP; +import com.linkedin.metadata.aspect.batch.MCLItem; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.patch.GenericJsonPatch; +import com.linkedin.metadata.aspect.patch.PatchOperationType; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.aspect.plugins.hooks.MCPSideEffect; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.utils.EntityKeyUtils; +import com.linkedin.versionset.VersionSetProperties; +import java.util.Collection; +import java.util.List; +import java.util.stream.Stream; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Getter; +import lombok.Setter; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; + +/** + * Side effect that updates the isLatest property for the referenced versioned entity's Version + * Properties aspect. + */ +@Slf4j +@Getter +@Setter +@Accessors(chain = true) +public class VersionPropertiesSideEffect extends MCPSideEffect { + @Nonnull private AspectPluginConfig config; + + @Override + protected Stream applyMCPSideEffect( + Collection changeMCPS, @Nonnull RetrieverContext retrieverContext) { + return changeMCPS.stream().flatMap(item -> processMCP(item, retrieverContext)); + } + + @Override + protected Stream postMCPSideEffect( + Collection mclItems, @Nonnull RetrieverContext retrieverContext) { + return Stream.of(); + } + + private static Stream processMCP( + ChangeMCP changeMCP, @Nonnull RetrieverContext retrieverContext) { + Urn entityUrn = changeMCP.getUrn(); + + if (!VERSION_PROPERTIES_ASPECT_NAME.equals(changeMCP.getAspectName())) { + return Stream.empty(); + } + + VersionProperties versionProperties = changeMCP.getAspect(VersionProperties.class); + if (versionProperties == null) { + log.error("Unable to process version properties for urn: {}", changeMCP.getUrn()); + return Stream.empty(); + } + + Urn versionSetUrn = versionProperties.getVersionSet(); + Aspect versionSetPropertiesAspect = + retrieverContext + .getAspectRetriever() + .getLatestAspectObject(versionSetUrn, VERSION_SET_PROPERTIES_ASPECT_NAME); + if (versionSetPropertiesAspect == null) { + return createVersionSet(versionProperties, changeMCP, retrieverContext); + } + + // Version set exists -- only update if there is a new latest + VersionSetProperties versionSetProperties = + RecordUtils.toRecordTemplate(VersionSetProperties.class, versionSetPropertiesAspect.data()); + Urn prevLatest = versionSetProperties.getLatest(); + if (prevLatest.equals(entityUrn)) { + return Stream.empty(); + } + + VersionProperties prevLatestVersionProperties = null; + Aspect prevLatestVersionPropertiesAspect = + retrieverContext + .getAspectRetriever() + .getLatestAspectObject(prevLatest, VERSION_PROPERTIES_ASPECT_NAME); + if (prevLatestVersionPropertiesAspect != null) { + prevLatestVersionProperties = + RecordUtils.toRecordTemplate( + VersionProperties.class, prevLatestVersionPropertiesAspect.data()); + if (versionProperties.getSortId().compareTo(prevLatestVersionProperties.getSortId()) <= 0) { + return Stream.empty(); + } + } + + // New version properties is the new latest + return updateVersionSetLatest( + versionProperties, prevLatestVersionProperties, prevLatest, changeMCP, retrieverContext); + } + + private static Stream createVersionSet( + @Nonnull VersionProperties versionProperties, + ChangeMCP changeMCP, + @Nonnull RetrieverContext retrieverContext) { + versionProperties.setIsLatest(true); + + Urn entityUrn = changeMCP.getUrn(); + Urn versionSetUrn = versionProperties.getVersionSet(); + + AspectSpec keyAspectSpec = + retrieverContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(VERSION_SET_ENTITY_NAME) + .getKeyAspectSpec(); + RecordTemplate versionSetKey = + EntityKeyUtils.convertUrnToEntityKey(versionSetUrn, keyAspectSpec); + ChangeMCP createVersionSetKey = + ChangeItemImpl.builder() + .urn(versionSetUrn) + .aspectName(VERSION_SET_KEY_ASPECT_NAME) + .changeType(ChangeType.UPSERT) + .recordTemplate(versionSetKey) + .auditStamp(changeMCP.getAuditStamp()) + .systemMetadata(changeMCP.getSystemMetadata()) + .build(retrieverContext.getAspectRetriever()); + + VersionSetProperties versionSetPropertiesWithNewLatest = + new VersionSetProperties() + .setVersioningScheme(versionProperties.getVersioningScheme()) + .setLatest(entityUrn); + ChangeMCP createVersionSetProperties = + ChangeItemImpl.builder() + .urn(versionSetUrn) + .aspectName(VERSION_SET_PROPERTIES_ASPECT_NAME) + .changeType(ChangeType.UPSERT) + .recordTemplate(versionSetPropertiesWithNewLatest) + .auditStamp(changeMCP.getAuditStamp()) + .systemMetadata(changeMCP.getSystemMetadata()) + .build(retrieverContext.getAspectRetriever()); + + return Stream.of(createVersionSetKey, createVersionSetProperties); + } + + private static Stream updateVersionSetLatest( + @Nonnull VersionProperties versionProperties, + @Nullable VersionProperties prevLatestVersionProperties, + @Nonnull Urn prevLatest, + ChangeMCP changeMCP, + @Nonnull RetrieverContext retrieverContext) { + versionProperties.setIsLatest(true); + + Urn entityUrn = changeMCP.getUrn(); + Urn versionSetUrn = versionProperties.getVersionSet(); + + VersionSetProperties versionSetPropertiesWithNewLatest = + new VersionSetProperties() + .setVersioningScheme(versionProperties.getVersioningScheme()) + .setLatest(entityUrn); + ChangeMCP updateVersionSetProperties = + ChangeItemImpl.builder() + .urn(versionSetUrn) + .aspectName(VERSION_SET_PROPERTIES_ASPECT_NAME) + .changeType(ChangeType.UPSERT) + .recordTemplate(versionSetPropertiesWithNewLatest) + .auditStamp(changeMCP.getAuditStamp()) + .systemMetadata(changeMCP.getSystemMetadata()) + .build(retrieverContext.getAspectRetriever()); + + if (prevLatestVersionProperties == null) { + return Stream.of(updateVersionSetProperties); + } + + EntitySpec entitySpec = + retrieverContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(prevLatest.getEntityType()); + GenericJsonPatch.PatchOp patchOp = new GenericJsonPatch.PatchOp(); + patchOp.setOp(PatchOperationType.ADD.getValue()); + patchOp.setPath("/isLatest"); + patchOp.setValue(false); + ChangeMCP updateOldLatestVersionProperties = + PatchItemImpl.builder() + .urn(prevLatest) + .entitySpec(entitySpec) + .aspectName(VERSION_PROPERTIES_ASPECT_NAME) + .aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME)) + .patch(GenericJsonPatch.builder().patch(List.of(patchOp)).build().getJsonPatch()) + .auditStamp(changeMCP.getAuditStamp()) + .systemMetadata(changeMCP.getSystemMetadata()) + .build(retrieverContext.getAspectRetriever().getEntityRegistry()) + .applyPatch(prevLatestVersionProperties, retrieverContext.getAspectRetriever()); + + return Stream.of(updateVersionSetProperties, updateOldLatestVersionProperties); + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/sideeffects/VersionSetSideEffect.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/sideeffects/VersionSetSideEffect.java index 5b26f5c8b11dd..e21f627d284f6 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/sideeffects/VersionSetSideEffect.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/sideeffects/VersionSetSideEffect.java @@ -124,21 +124,26 @@ private static Stream updateLatest( .getEntitySpec(newLatest.getEntityType()); VersionProperties newLatestProperties = RecordUtils.toRecordTemplate(VersionProperties.class, newLatestEntity.data()); - GenericJsonPatch.PatchOp currentPatch = new GenericJsonPatch.PatchOp(); - currentPatch.setOp(PatchOperationType.ADD.getValue()); - currentPatch.setPath("/isLatest"); - currentPatch.setValue(true); - mcpItems.add( - PatchItemImpl.builder() - .urn(newLatest) - .entitySpec(entitySpec) - .aspectName(VERSION_PROPERTIES_ASPECT_NAME) - .aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME)) - .patch(GenericJsonPatch.builder().patch(List.of(currentPatch)).build().getJsonPatch()) - .auditStamp(changeMCP.getAuditStamp()) - .systemMetadata(changeMCP.getSystemMetadata()) - .build(retrieverContext.getAspectRetriever().getEntityRegistry()) - .applyPatch(newLatestProperties, retrieverContext.getAspectRetriever())); + + if (Boolean.FALSE.equals(newLatestProperties.isIsLatest())) { + GenericJsonPatch.PatchOp currentPatch = new GenericJsonPatch.PatchOp(); + currentPatch.setOp(PatchOperationType.ADD.getValue()); + currentPatch.setPath("/isLatest"); + currentPatch.setValue(true); + mcpItems.add( + PatchItemImpl.builder() + .urn(newLatest) + .entitySpec(entitySpec) + .aspectName(VERSION_PROPERTIES_ASPECT_NAME) + .aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME)) + .patch( + GenericJsonPatch.builder().patch(List.of(currentPatch)).build().getJsonPatch()) + .auditStamp(changeMCP.getAuditStamp()) + .systemMetadata(changeMCP.getSystemMetadata()) + .build(retrieverContext.getAspectRetriever().getEntityRegistry()) + .applyPatch(newLatestProperties, retrieverContext.getAspectRetriever())); + } + return mcpItems.stream(); } return Stream.empty(); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/validation/VersionPropertiesValidator.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/validation/VersionPropertiesValidator.java index 53f4d9b1b3dce..41a145673a53c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/validation/VersionPropertiesValidator.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/versioning/validation/VersionPropertiesValidator.java @@ -1,11 +1,6 @@ package com.linkedin.metadata.entity.versioning.validation; -import static com.linkedin.metadata.Constants.VERSION_LABEL_FIELD_NAME; -import static com.linkedin.metadata.Constants.VERSION_PROPERTIES_ASPECT_NAME; -import static com.linkedin.metadata.Constants.VERSION_SET_FIELD_NAME; -import static com.linkedin.metadata.Constants.VERSION_SET_KEY_ASPECT_NAME; -import static com.linkedin.metadata.Constants.VERSION_SET_PROPERTIES_ASPECT_NAME; -import static com.linkedin.metadata.Constants.VERSION_SORT_ID_FIELD_NAME; +import static com.linkedin.metadata.Constants.*; import com.datahub.util.RecordUtils; import com.google.common.annotations.VisibleForTesting; @@ -25,6 +20,7 @@ import com.linkedin.metadata.entity.SearchRetriever; import com.linkedin.metadata.entity.ebean.batch.PatchItemImpl; import com.linkedin.metadata.key.VersionSetKey; +import com.linkedin.metadata.models.AspectSpec; import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.SortCriterion; @@ -33,6 +29,7 @@ import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.utils.QueryUtils; import com.linkedin.metadata.utils.CriterionUtils; +import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.versionset.VersionSetProperties; import com.linkedin.versionset.VersioningScheme; import java.util.Collection; @@ -98,21 +95,28 @@ public static Stream validatePropertiesUpserts( Collections.singleton(versionSetUrn), ImmutableSet.of(VERSION_SET_KEY_ASPECT_NAME, VERSION_SET_PROPERTIES_ASPECT_NAME)) .get(versionSetUrn); - if (aspects == null || aspects.isEmpty()) { - exceptions.addException(mcpItem, "Version Set specified does not exist: " + versionSetUrn); - continue; + + final AspectSpec keyAspectSpec = + retrieverContext + .getAspectRetriever() + .getEntityRegistry() + .getEntitySpec(VERSION_SET_ENTITY_NAME) + .getKeyAspectSpec(); + VersionSetKey versionSetKey = + Optional.ofNullable(aspects) + .map(a -> aspects.get(VERSION_SET_KEY_ASPECT_NAME)) + .map(a -> RecordUtils.toRecordTemplate(VersionSetKey.class, a.data())) + .orElse( + (VersionSetKey) + EntityKeyUtils.convertUrnToEntityKey(versionSetUrn, keyAspectSpec)); + if (!mcpItem.getEntitySpec().getName().equals(versionSetKey.getEntityType())) { + exceptions.addException( + mcpItem, + "Version Set specified entity type does not match, expected type: " + + versionSetKey.getEntityType()); } - Optional keyAspect = Optional.ofNullable(aspects.get(VERSION_SET_KEY_ASPECT_NAME)); - if (keyAspect.isPresent()) { - VersionSetKey versionSetKey = - RecordUtils.toRecordTemplate(VersionSetKey.class, keyAspect.get().data()); - if (!mcpItem.getEntitySpec().getName().equals(versionSetKey.getEntityType())) { - exceptions.addException( - mcpItem, - "Version Set specified entity type does not match, expected type: " - + versionSetKey.getEntityType()); - } + if (aspects != null && !aspects.isEmpty()) { // Validate sort ID scheme String sortId = versionProperties.getSortId(); Optional versionSetPropertiesAspect = @@ -123,7 +127,18 @@ public static Stream validatePropertiesUpserts( RecordUtils.toRecordTemplate( VersionSetProperties.class, versionSetPropertiesAspect.get().data()); VersioningScheme versioningScheme = versionSetProperties.getVersioningScheme(); + if (!versioningScheme.equals(versionProperties.getVersioningScheme())) { + exceptions.addException( + mcpItem, + "Versioning Scheme does not match Version Set properties. Expected Scheme: " + + versioningScheme + + " Provided Scheme: " + + versionProperties.getVersioningScheme()); + } + switch (versioningScheme) { + case LEXICOGRAPHIC_STRING: // No validation + break; case ALPHANUMERIC_GENERATED_BY_DATAHUB: validateDataHubGeneratedScheme(sortId, exceptions, mcpItem); break; @@ -131,8 +146,6 @@ public static Stream validatePropertiesUpserts( exceptions.addException(mcpItem, "Unsupported scheme type: " + versioningScheme); } } - } else { - exceptions.addException(mcpItem, "Version Set specified does not exist: " + versionSetUrn); } // Best effort validate on uniqueness for sort ID and version label, search has potential diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/EntityVersioningServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/EntityVersioningServiceTest.java index a88dd4158503b..da7b31d944382 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/EntityVersioningServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/EntityVersioningServiceTest.java @@ -99,7 +99,6 @@ public void setup() throws EntityRegistryException { @Test public void testLinkLatestVersionNewVersionSet() throws Exception { - VersionPropertiesInput input = new VersionPropertiesInput("Test comment", "Test label", 123456789L, "testCreator"); // Mock version set doesn't exist @@ -134,21 +133,31 @@ public void testLinkLatestVersionNewVersionSet() throws Exception { assertEquals(versionProps.getSortId(), INITIAL_VERSION_SORT_ID); assertEquals(versionProps.getComment(), "Test comment"); assertEquals(versionProps.getVersionSet(), TEST_VERSION_SET_URN); + } - MetadataChangeProposal versionSetPropertiesProposal = - capturedAspects.stream() - .filter(mcpItem -> VERSION_SET_PROPERTIES_ASPECT_NAME.equals(mcpItem.getAspectName())) - .collect(Collectors.toList()) - .get(0); - VersionSetProperties versionSetProperties = - GenericRecordUtils.deserializeAspect( - versionSetPropertiesProposal.getAspect().getValue(), - versionSetPropertiesProposal.getAspect().getContentType(), - VersionSetProperties.class); - assertEquals(versionSetProperties.getLatest(), TEST_DATASET_URN); - assertEquals( - versionSetProperties.getVersioningScheme(), - VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB); + @Test + public void testLinkInvalidVersioningScheme() throws Exception { + VersionPropertiesInput input = + new VersionPropertiesInput("Test comment", "Test label", 123456789L, "testCreator"); + // Mock version set exists with invalid versioning scheme + when(mockAspectRetriever.entityExists(anySet())).thenReturn(Map.of(TEST_VERSION_SET_URN, true)); + VersionSetProperties existingVersionSetProps = + new VersionSetProperties() + .setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING) + .setLatest(TEST_DATASET_URN); + SystemAspect mockVersionSetPropertiesAspect = mock(SystemAspect.class); + when(mockVersionSetPropertiesAspect.getRecordTemplate()).thenReturn(existingVersionSetProps); + when(mockVersionSetPropertiesAspect.getSystemMetadataVersion()).thenReturn(Optional.of(1L)); + when(mockAspectRetriever.getLatestSystemAspect(eq(TEST_VERSION_SET_URN), anyString())) + .thenReturn(mockVersionSetPropertiesAspect); + + // Execute + assertThrows( + IllegalArgumentException.class, + () -> { + versioningService.linkLatestVersion( + mockOpContext, TEST_VERSION_SET_URN, TEST_DATASET_URN, input); + }); } @Test @@ -174,6 +183,7 @@ public void testLinkLatestVersionExistingVersionSet() throws Exception { // Mock existing version properties with a sort ID VersionProperties existingVersionProps = new VersionProperties() + .setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB) .setSortId("AAAAAAAA") .setVersion(new VersionTag().setVersionTag("Label1")) .setVersionSet(TEST_VERSION_SET_URN); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/sideeffects/VersionPropertiesSideEffectTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/sideeffects/VersionPropertiesSideEffectTest.java new file mode 100644 index 0000000000000..0128c4ef048ce --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/sideeffects/VersionPropertiesSideEffectTest.java @@ -0,0 +1,368 @@ +package com.linkedin.metadata.entity.versioning.sideeffects; + +import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.search.elasticsearch.indexbuilder.SettingsBuilder.*; +import static org.mockito.Mockito.mock; +import static org.testng.Assert.*; + +import com.linkedin.common.GlobalTags; +import com.linkedin.common.TagAssociationArray; +import com.linkedin.common.VersionProperties; +import com.linkedin.common.VersionTag; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.aspect.GraphRetriever; +import com.linkedin.metadata.aspect.SystemAspect; +import com.linkedin.metadata.aspect.batch.MCPItem; +import com.linkedin.metadata.aspect.plugins.config.AspectPluginConfig; +import com.linkedin.metadata.entity.SearchRetriever; +import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl; +import com.linkedin.metadata.entity.ebean.batch.MCLItemImpl; +import com.linkedin.metadata.key.VersionSetKey; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.test.metadata.aspect.MockAspectRetriever; +import com.linkedin.test.metadata.aspect.TestEntityRegistry; +import com.linkedin.versionset.VersionSetProperties; +import com.linkedin.versionset.VersioningScheme; +import io.datahubproject.metadata.context.RetrieverContext; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class VersionPropertiesSideEffectTest { + private static final TestEntityRegistry TEST_REGISTRY = new TestEntityRegistry(); + + // Standard version set with a single entity in it, `PREVIOUS_LATEST_URN` + private static final Urn HAS_SET_PROPERTIES_VERSION_SET_URN = + UrnUtils.getUrn("urn:li:versionSet:(has-properties,dataset)"); + + // Version set urn with a VersionSetKey but no VersionSetProperties + private static final Urn MISSING_SET_PROPERTIES_VERSION_SET_URN = + UrnUtils.getUrn("urn:li:versionSet:(missing-properties,mlModel)"); + + // Version set urn that does not exist + private static final Urn NON_EXISTENT_VERSION_SET_URN = + UrnUtils.getUrn("urn:li:versionSet:(does-not-exist,dataset)"); + + // Its latest urn does not have a version properties aspect + private static final Urn INVALID_VERSION_SET_URN = + UrnUtils.getUrn("urn:li:versionSet:(invalid-properties,dataset)"); + + private static final Urn PREVIOUS_LATEST_URN = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,previous,PROD)"); + private static final Urn MISSING_VERSION_PROPERTIES_URN = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,invalid,PROD)"); + private static final Urn ENTITY_URN = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,entity,PROD)"); + private static final Urn ML_MODEL_URN = + UrnUtils.getUrn("urn:li:mlModel:(urn:li:dataPlatform:mlflow,model,PROD)"); + + private static final AspectPluginConfig TEST_PLUGIN_CONFIG = + AspectPluginConfig.builder() + .className(VersionSetSideEffect.class.getName()) + .enabled(true) + .supportedOperations( + List.of("CREATE", "PATCH", "CREATE_ENTITY", "UPSERT", "DELETE", "RESTATE")) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(ALL) + .aspectName(VERSION_PROPERTIES_ASPECT_NAME) + .build())) + .build(); + + private MockAspectRetriever mockAspectRetriever; + private RetrieverContext retrieverContext; + private VersionPropertiesSideEffect sideEffect; + + @BeforeMethod + public void setup() { + VersionProperties previousLatestVersionProperties = + new VersionProperties() + .setVersionSet(HAS_SET_PROPERTIES_VERSION_SET_URN) + .setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING) + .setVersion(new VersionTag().setVersionTag("v1")) + .setSortId("abc"); + VersionSetProperties existingVersionSetProperties = + new VersionSetProperties() + .setLatest(PREVIOUS_LATEST_URN) + .setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING); + VersionSetProperties invalidVersionSetProperties = + new VersionSetProperties() + .setLatest(MISSING_VERSION_PROPERTIES_URN) + .setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING); + VersionSetKey existingVersionSetKey = + new VersionSetKey().setId("missing-properties-exists").setEntityType(DATASET_ENTITY_NAME); + + Map> data = new HashMap<>(); + data.put(PREVIOUS_LATEST_URN, List.of(previousLatestVersionProperties)); + data.put(HAS_SET_PROPERTIES_VERSION_SET_URN, List.of(existingVersionSetProperties)); + data.put(MISSING_SET_PROPERTIES_VERSION_SET_URN, List.of(existingVersionSetKey)); + data.put(INVALID_VERSION_SET_URN, List.of(invalidVersionSetProperties)); + mockAspectRetriever = new MockAspectRetriever(data); + mockAspectRetriever.setEntityRegistry(TEST_REGISTRY); + + retrieverContext = + RetrieverContext.builder() + .searchRetriever(mock(SearchRetriever.class)) + .aspectRetriever(mockAspectRetriever) + .graphRetriever(mock(GraphRetriever.class)) + .build(); + + sideEffect = new VersionPropertiesSideEffect(); + sideEffect.setConfig(TEST_PLUGIN_CONFIG); + } + + @Test + public void testCreateVersionSet() { + // Create version set if it does not exist + VersionProperties properties = + new VersionProperties() + .setVersionSet(NON_EXISTENT_VERSION_SET_URN) + .setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING) + .setVersion(new VersionTag().setVersionTag("version")) + .setSortId("abc"); + + EntitySpec entitySpec = TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME); + ChangeItemImpl changeItem = + ChangeItemImpl.builder() + .urn(ENTITY_URN) + .aspectName(VERSION_PROPERTIES_ASPECT_NAME) + .entitySpec(entitySpec) + .aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME)) + .recordTemplate(properties) + .previousSystemAspect(mock(SystemAspect.class)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + + // Run side effect + List sideEffectResults = + sideEffect + .applyMCPSideEffect(Collections.singletonList(changeItem), retrieverContext) + .collect(Collectors.toList()); + + // Verify results + assert properties.isIsLatest(); + assertEquals(sideEffectResults.size(), 2, "Expected two mcps: key and set properties"); + + MCPItem keyMCP = sideEffectResults.get(0); + assertEquals(keyMCP.getUrn(), NON_EXISTENT_VERSION_SET_URN); + VersionSetKey versionSetKey = keyMCP.getAspect(VersionSetKey.class); + assertNotNull(versionSetKey); + assertEquals(versionSetKey.getId(), "does-not-exist"); + assertEquals(versionSetKey.getEntityType(), DATASET_ENTITY_NAME); + + MCPItem setPropertiesMCP = sideEffectResults.get(1); + assertEquals(setPropertiesMCP.getUrn(), NON_EXISTENT_VERSION_SET_URN); + VersionSetProperties versionSetProperties = + setPropertiesMCP.getAspect(VersionSetProperties.class); + assertNotNull(versionSetProperties); + assertEquals(versionSetProperties.getLatest(), ENTITY_URN); + assertEquals(versionSetProperties.getVersioningScheme(), VersioningScheme.LEXICOGRAPHIC_STRING); + } + + @Test + public void testUpdateLatest() { + // Upsert version set properties with new latest; update old latest version properties + VersionProperties properties = + new VersionProperties() + .setVersionSet(HAS_SET_PROPERTIES_VERSION_SET_URN) + .setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING) + .setVersion(new VersionTag().setVersionTag("version")) + .setSortId("bbb"); + + EntitySpec entitySpec = TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME); + ChangeItemImpl changeItem = + ChangeItemImpl.builder() + .urn(ENTITY_URN) + .aspectName(VERSION_PROPERTIES_ASPECT_NAME) + .entitySpec(entitySpec) + .aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME)) + .recordTemplate(properties) + .previousSystemAspect(mock(SystemAspect.class)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + + // Run side effect + List sideEffectResults = + sideEffect + .applyMCPSideEffect(Collections.singletonList(changeItem), retrieverContext) + .collect(Collectors.toList()); + + // Verify results + assert properties.isIsLatest(); + assertEquals( + sideEffectResults.size(), + 2, + "Expected two mcps: set properties and old latest version properties"); + + MCPItem setPropertiesMCP = sideEffectResults.get(0); + assertEquals(setPropertiesMCP.getUrn(), HAS_SET_PROPERTIES_VERSION_SET_URN); + VersionSetProperties versionSetProperties = + setPropertiesMCP.getAspect(VersionSetProperties.class); + assertNotNull(versionSetProperties); + assertEquals(versionSetProperties.getLatest(), ENTITY_URN); + assertEquals(versionSetProperties.getVersioningScheme(), VersioningScheme.LEXICOGRAPHIC_STRING); + + MCPItem oldLatestMCP = sideEffectResults.get(1); + assertEquals(oldLatestMCP.getUrn(), PREVIOUS_LATEST_URN); + VersionProperties oldLatestVersionProperties = oldLatestMCP.getAspect(VersionProperties.class); + assertNotNull(oldLatestVersionProperties); + assertFalse(oldLatestVersionProperties.isIsLatest()); + } + + @Test + public void testNotNewLatest() { + // Do nothing if not changing latest + VersionProperties properties = + new VersionProperties() + .setVersionSet(HAS_SET_PROPERTIES_VERSION_SET_URN) + .setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING) + .setVersion(new VersionTag().setVersionTag("version")) + .setSortId("aaa"); + + EntitySpec entitySpec = TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME); + ChangeItemImpl changeItem = + ChangeItemImpl.builder() + .urn(ENTITY_URN) + .aspectName(VERSION_PROPERTIES_ASPECT_NAME) + .entitySpec(entitySpec) + .aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME)) + .recordTemplate(properties) + .previousSystemAspect(mock(SystemAspect.class)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + + // Run side effect + List sideEffectResults = + sideEffect + .applyMCPSideEffect(Collections.singletonList(changeItem), retrieverContext) + .collect(Collectors.toList()); + + // Verify results + assert !properties.isIsLatest(); + assertEquals(sideEffectResults.size(), 0, "Expected no operations"); + } + + @Test + public void testCreateVersionSetKeyExists() { + // Create version set properties if entity exists but properties aspect does not + VersionProperties properties = + new VersionProperties() + .setVersionSet(MISSING_SET_PROPERTIES_VERSION_SET_URN) + .setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING) + .setVersion(new VersionTag().setVersionTag("version")) + .setSortId("abc"); + + EntitySpec entitySpec = TEST_REGISTRY.getEntitySpec(ML_MODEL_ENTITY_NAME); + ChangeItemImpl changeItem = + ChangeItemImpl.builder() + .urn(ML_MODEL_URN) + .aspectName(VERSION_PROPERTIES_ASPECT_NAME) + .entitySpec(entitySpec) + .aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME)) + .recordTemplate(properties) + .previousSystemAspect(mock(SystemAspect.class)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + + // Run side effect + List sideEffectResults = + sideEffect + .applyMCPSideEffect(Collections.singletonList(changeItem), retrieverContext) + .collect(Collectors.toList()); + + // Verify results + assert properties.isIsLatest(); + assertEquals(sideEffectResults.size(), 2, "Expected two mcps: key and set properties"); + + MCPItem keyMCP = sideEffectResults.get(0); + assertEquals(keyMCP.getUrn(), MISSING_SET_PROPERTIES_VERSION_SET_URN); + VersionSetKey versionSetKey = keyMCP.getAspect(VersionSetKey.class); + assertNotNull(versionSetKey); + assertEquals(versionSetKey.getId(), "missing-properties"); + assertEquals(versionSetKey.getEntityType(), ML_MODEL_ENTITY_NAME); + + MCPItem setPropertiesMCP = sideEffectResults.get(1); + assertEquals(setPropertiesMCP.getUrn(), MISSING_SET_PROPERTIES_VERSION_SET_URN); + VersionSetProperties versionSetProperties = + setPropertiesMCP.getAspect(VersionSetProperties.class); + assertNotNull(versionSetProperties); + assertEquals(versionSetProperties.getLatest(), ML_MODEL_URN); + assertEquals(versionSetProperties.getVersioningScheme(), VersioningScheme.LEXICOGRAPHIC_STRING); + } + + @Test + public void testUpdateLatestInvalidPreviousLatest() { + // Upsert version set properties with new latest; update old latest version properties + VersionProperties properties = + new VersionProperties() + .setVersionSet(INVALID_VERSION_SET_URN) + .setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING) + .setVersion(new VersionTag().setVersionTag("version")) + .setSortId("bbb"); + + EntitySpec entitySpec = TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME); + ChangeItemImpl changeItem = + ChangeItemImpl.builder() + .urn(ENTITY_URN) + .aspectName(VERSION_PROPERTIES_ASPECT_NAME) + .entitySpec(entitySpec) + .aspectSpec(entitySpec.getAspectSpec(VERSION_PROPERTIES_ASPECT_NAME)) + .recordTemplate(properties) + .previousSystemAspect(mock(SystemAspect.class)) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + + // Run side effect + List sideEffectResults = + sideEffect + .applyMCPSideEffect(Collections.singletonList(changeItem), retrieverContext) + .collect(Collectors.toList()); + + // Verify results + assert properties.isIsLatest(); + assertEquals(sideEffectResults.size(), 1, "Expected one mcps: set properties"); + + MCPItem setPropertiesMCP = sideEffectResults.get(0); + assertEquals(setPropertiesMCP.getUrn(), INVALID_VERSION_SET_URN); + VersionSetProperties versionSetProperties = + setPropertiesMCP.getAspect(VersionSetProperties.class); + assertEquals(versionSetProperties.getLatest(), ENTITY_URN); + assertEquals(versionSetProperties.getVersioningScheme(), VersioningScheme.LEXICOGRAPHIC_STRING); + } + + @Test + public void testNoChangesForNonVersionSetProperties() { + // Create some other type of aspect change + EntitySpec entitySpec = TEST_REGISTRY.getEntitySpec(DATASET_ENTITY_NAME); + ChangeItemImpl changeItem = + ChangeItemImpl.builder() + .urn(MISSING_VERSION_PROPERTIES_URN) + .aspectName(GLOBAL_TAGS_ASPECT_NAME) + .entitySpec(entitySpec) + .aspectSpec(entitySpec.getAspectSpec(GLOBAL_TAGS_ASPECT_NAME)) + .recordTemplate(new GlobalTags().setTags(new TagAssociationArray())) + .auditStamp(AuditStampUtils.createDefaultAuditStamp()) + .build(mockAspectRetriever); + + MCLItemImpl mclItem = + MCLItemImpl.builder().build(changeItem, null, null, retrieverContext.getAspectRetriever()); + + // Run side effect + List sideEffectResults = + sideEffect + .postMCPSideEffect(Collections.singletonList(mclItem), retrieverContext) + .collect(Collectors.toList()); + + // Verify no changes for non-version set properties aspects + assertEquals( + sideEffectResults.size(), 0, "Expected no changes for non-version set properties aspect"); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/validation/VersionPropertiesValidatorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/validation/VersionPropertiesValidatorTest.java index 95bdd73adb901..3eabf25ee8286 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/validation/VersionPropertiesValidatorTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/versioning/validation/VersionPropertiesValidatorTest.java @@ -1,6 +1,6 @@ package com.linkedin.metadata.entity.versioning.validation; -import static com.linkedin.metadata.Constants.CHART_ENTITY_NAME; +import static com.linkedin.metadata.Constants.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; @@ -37,8 +37,12 @@ public class VersionPropertiesValidatorTest { private static final String ENTITY_TYPE = "dataset"; - private static final Urn TEST_VERSION_SET_URN = - UrnUtils.getUrn("urn:li:versionSet:(12356,dataset)"); + private static final Urn MANAGED_VERSION_SET_URN = + UrnUtils.getUrn("urn:li:versionSet:(managed,dataset)"); + private static final Urn LEXICOGRAPHIC_VERSION_SET_URN = + UrnUtils.getUrn("urn:li:versionSet:(lexicographic,dataset)"); + private static final Urn ML_MODEL_VERSION_SET_URN = + UrnUtils.getUrn("urn:li:versionSet:(managed,mlModel)"); private static final Urn TEST_ENTITY_URN = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"); @@ -55,17 +59,26 @@ public void setup() { .thenReturn(new ScrollResult().setEntities(new SearchEntityArray())); mockGraphRetriever = Mockito.mock(GraphRetriever.class); - // Create version set key and properties - VersionSetKey versionSetKey = new VersionSetKey(); - versionSetKey.setEntityType(ENTITY_TYPE); - - VersionSetProperties versionSetProperties = new VersionSetProperties(); - versionSetProperties.setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB); - // Initialize mock aspect retriever with version set data Map> data = new HashMap<>(); - data.put(TEST_VERSION_SET_URN, Arrays.asList(versionSetKey, versionSetProperties)); + data.put( + MANAGED_VERSION_SET_URN, + Arrays.asList( + new VersionSetKey().setEntityType(DATASET_ENTITY_NAME), + new VersionSetProperties() + .setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB))); + data.put( + LEXICOGRAPHIC_VERSION_SET_URN, + Arrays.asList( + new VersionSetKey().setEntityType(DATASET_ENTITY_NAME), + new VersionSetProperties().setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING))); + data.put( + ML_MODEL_VERSION_SET_URN, + Arrays.asList( + new VersionSetKey().setEntityType(ML_MODEL_ENTITY_NAME), + new VersionSetProperties().setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING))); mockAspectRetriever = new MockAspectRetriever(data); + mockAspectRetriever.setEntityRegistry(new TestEntityRegistry()); retrieverContext = io.datahubproject.metadata.context.RetrieverContext.builder() @@ -78,7 +91,8 @@ public void setup() { @Test public void testValidVersionProperties() { VersionProperties properties = new VersionProperties(); - properties.setVersionSet(TEST_VERSION_SET_URN); + properties.setVersionSet(MANAGED_VERSION_SET_URN); + properties.setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB); properties.setSortId("ABCDEFGH"); // Valid 8-char uppercase alpha properties.setVersion(new VersionTag().setVersionTag("123")); @@ -93,7 +107,8 @@ public void testValidVersionProperties() { @Test public void testInvalidSortId() { VersionProperties properties = new VersionProperties(); - properties.setVersionSet(TEST_VERSION_SET_URN); + properties.setVersionSet(MANAGED_VERSION_SET_URN); + properties.setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB); properties.setSortId("123"); // Invalid - not 8 chars, not alpha properties.setVersion(new VersionTag().setVersionTag("123")); @@ -108,12 +123,30 @@ public void testInvalidSortId() { } @Test - public void testNonexistentVersionSet() { - Urn nonexistentUrn = UrnUtils.getUrn("urn:li:versionSet:(nonexistent,dataset)"); + public void testSortIdValidLexicographic() { + VersionProperties properties = new VersionProperties(); + properties.setVersionSet(LEXICOGRAPHIC_VERSION_SET_URN); + properties.setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING); + properties.setSortId("123"); + properties.setVersion(new VersionTag().setVersionTag("123")); + + Stream validationResult = + VersionPropertiesValidator.validatePropertiesUpserts( + TestMCP.ofOneUpsertItem(TEST_ENTITY_URN, properties, new TestEntityRegistry()), + retrieverContext); + + var exceptions = validationResult.findAny(); + Assert.assertTrue( + exceptions.isEmpty(), exceptions.map(AspectValidationException::getMessage).orElse(null)); + } + @Test + public void testVersioningSchemeMismatch() { VersionProperties properties = new VersionProperties(); - properties.setVersionSet(nonexistentUrn); + properties.setVersionSet(MANAGED_VERSION_SET_URN); + properties.setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING); properties.setSortId("ABCDEFGH"); + properties.setVersion(new VersionTag().setVersionTag("123")); Stream validationResult = VersionPropertiesValidator.validatePropertiesUpserts( @@ -122,31 +155,38 @@ public void testNonexistentVersionSet() { AspectValidationException exception = validationResult.findAny().get(); Assert.assertNotNull(exception); - Assert.assertTrue(exception.getMessage().contains("Version Set specified does not exist")); + Assert.assertTrue( + exception.getMessage().contains("Versioning Scheme does not match Version Set properties")); } @Test - public void testEntityTypeMismatch() { - // Create version set with different entity type - VersionSetKey wrongTypeKey = new VersionSetKey(); - wrongTypeKey.setEntityType(CHART_ENTITY_NAME); + public void testNonexistentVersionSet() { + // Non-existent version set gets created by VersionPropertiesSideEffect + Urn nonexistentUrn = UrnUtils.getUrn("urn:li:versionSet:(nonexistent,dataset)"); - VersionSetProperties versionSetProperties = new VersionSetProperties(); - versionSetProperties.setVersioningScheme(VersioningScheme.ALPHANUMERIC_GENERATED_BY_DATAHUB); + VersionProperties properties = new VersionProperties(); + properties.setVersionSet(nonexistentUrn); + properties.setSortId("abc"); + properties.setVersion(new VersionTag().setVersionTag("123")); - Map> data = new HashMap<>(); - data.put(TEST_VERSION_SET_URN, Arrays.asList(wrongTypeKey, versionSetProperties)); - mockAspectRetriever = new MockAspectRetriever(data); + Stream validationResult = + VersionPropertiesValidator.validatePropertiesUpserts( + TestMCP.ofOneUpsertItem(TEST_ENTITY_URN, properties, new TestEntityRegistry()), + retrieverContext); - retrieverContext = - io.datahubproject.metadata.context.RetrieverContext.builder() - .aspectRetriever(mockAspectRetriever) - .searchRetriever(mockSearchRetriever) - .graphRetriever(mockGraphRetriever) - .build(); + var exceptions = validationResult.findAny(); + Assert.assertTrue( + exceptions.isEmpty(), exceptions.map(AspectValidationException::getMessage).orElse(null)); + } + + @Test + public void testEntityTypeMismatch() { + VersionSetProperties versionSetProperties = new VersionSetProperties(); + versionSetProperties.setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING); VersionProperties properties = new VersionProperties(); - properties.setVersionSet(TEST_VERSION_SET_URN); + properties.setVersionSet(ML_MODEL_VERSION_SET_URN); + properties.setVersioningScheme(VersioningScheme.LEXICOGRAPHIC_STRING); properties.setSortId("ABCDEFGH"); properties.setVersion(new VersionTag().setVersionTag("123")); @@ -164,7 +204,7 @@ public void testEntityTypeMismatch() { @Test public void testIsLatestFieldSpecified() { VersionProperties properties = new VersionProperties(); - properties.setVersionSet(TEST_VERSION_SET_URN); + properties.setVersionSet(MANAGED_VERSION_SET_URN); properties.setSortId("ABCDEFGH"); properties.setIsLatest(true); // Should not be specified diff --git a/metadata-models/src/main/pegasus/com/linkedin/common/VersionProperties.pdl b/metadata-models/src/main/pegasus/com/linkedin/common/VersionProperties.pdl index af4d48debe021..18e5c3d869223 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/common/VersionProperties.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/common/VersionProperties.pdl @@ -1,5 +1,7 @@ namespace com.linkedin.common +import com.linkedin.versionset.VersioningScheme + /** * Properties about a versioned asset i.e. dataset, ML Model, etc. */ @@ -56,6 +58,12 @@ record VersionProperties { } sortId: string + /** + * What versioning scheme `sortId` belongs to. + * Defaults to a plain string that is lexicographically sorted. + */ + versioningScheme: VersioningScheme = "LEXICOGRAPHIC_STRING" + /** * Timestamp reflecting when this asset version was created in the source system. */ @@ -74,4 +82,4 @@ record VersionProperties { "fieldType": "BOOLEAN" } isLatest: optional boolean -} \ No newline at end of file +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/versionset/VersionSetProperties.pdl b/metadata-models/src/main/pegasus/com/linkedin/versionset/VersionSetProperties.pdl index 0e50c33aa2b7d..6cf5469f779e3 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/versionset/VersionSetProperties.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/versionset/VersionSetProperties.pdl @@ -18,7 +18,5 @@ record VersionSetProperties includes CustomProperties { /** * What versioning scheme is being utilized for the versioned entities sort criterion. Static once set */ - versioningScheme: enum VersioningScheme { - ALPHANUMERIC_GENERATED_BY_DATAHUB - } -} \ No newline at end of file + versioningScheme: VersioningScheme +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/versionset/VersioningScheme.pdl b/metadata-models/src/main/pegasus/com/linkedin/versionset/VersioningScheme.pdl new file mode 100644 index 0000000000000..f85c50431154c --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/versionset/VersioningScheme.pdl @@ -0,0 +1,13 @@ +namespace com.linkedin.versionset + +enum VersioningScheme { + /** + * String sorted lexicographically. + */ + LEXICOGRAPHIC_STRING, + + /** + * String managed by DataHub. Currently, an 8 character alphabetical string. + */ + ALPHANUMERIC_GENERATED_BY_DATAHUB +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java index 907f72b17e145..210e8311d9245 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/plugins/SpringStandardPluginConfiguration.java @@ -13,6 +13,7 @@ import com.linkedin.metadata.aspect.validation.UrnAnnotationValidator; import com.linkedin.metadata.aspect.validation.UserDeleteValidator; import com.linkedin.metadata.dataproducts.sideeffects.DataProductUnsetSideEffect; +import com.linkedin.metadata.entity.versioning.sideeffects.VersionPropertiesSideEffect; import com.linkedin.metadata.entity.versioning.sideeffects.VersionSetSideEffect; import com.linkedin.metadata.entity.versioning.validation.VersionPropertiesValidator; import com.linkedin.metadata.entity.versioning.validation.VersionSetPropertiesValidator; @@ -228,6 +229,24 @@ public AspectPayloadValidator versionSetPropertiesValidator() { .build()); } + @Bean + @ConditionalOnProperty(name = "featureFlags.entityVersioning", havingValue = "true") + public MCPSideEffect versionPropertiesSideEffect() { + return new VersionPropertiesSideEffect() + .setConfig( + AspectPluginConfig.builder() + .className(VersionPropertiesSideEffect.class.getName()) + .enabled(true) + .supportedOperations(List.of(UPSERT, UPDATE, PATCH, CREATE, CREATE_ENTITY)) + .supportedEntityAspectNames( + List.of( + AspectPluginConfig.EntityAspectName.builder() + .entityName(ALL) + .aspectName(VERSION_PROPERTIES_ASPECT_NAME) + .build())) + .build()); + } + @Bean @ConditionalOnProperty(name = "featureFlags.entityVersioning", havingValue = "true") public MCPSideEffect versionSetSideEffect() { diff --git a/smoke-test/tests/entity_versioning/test_versioning_ingest.py b/smoke-test/tests/entity_versioning/test_versioning_ingest.py new file mode 100644 index 0000000000000..159169b34d89e --- /dev/null +++ b/smoke-test/tests/entity_versioning/test_versioning_ingest.py @@ -0,0 +1,223 @@ +import pytest + +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.graph.client import DataHubGraph +from datahub.metadata.schema_classes import ( + VersioningSchemeClass, + VersionPropertiesClass, + VersionSetPropertiesClass, + VersionTagClass, +) +from datahub.metadata.urns import DatasetUrn, VersionSetUrn +from tests.consistency_utils import wait_for_writes_to_sync + +OLD_LATEST_URN = DatasetUrn("v", "versioning_old_latest") +ENTITY_URN = DatasetUrn("v", "versioning_entity") +EXISTS_VERSION_SET_URN = VersionSetUrn("exists", DatasetUrn.ENTITY_TYPE) +NOT_EXISTS_VERSION_SET_URN = VersionSetUrn("not-exists", DatasetUrn.ENTITY_TYPE) +BULK_URNS = [DatasetUrn("v", f"versioning_entity_{i}").urn() for i in range(5, 15)] + + +@pytest.fixture(scope="function", autouse=True) +def ingest_cleanup_data(graph_client: DataHubGraph): + try: + graph_client.emit( + MetadataChangeProposalWrapper( + entityUrn=OLD_LATEST_URN.urn(), + aspect=VersionPropertiesClass( + versionSet=EXISTS_VERSION_SET_URN.urn(), + version=VersionTagClass(versionTag="first"), + sortId="abc", + versioningScheme=VersioningSchemeClass.LEXICOGRAPHIC_STRING, + ), + ) + ) + yield + finally: + graph_client.hard_delete_entity(EXISTS_VERSION_SET_URN.urn()) + graph_client.hard_delete_entity(NOT_EXISTS_VERSION_SET_URN.urn()) + graph_client.hard_delete_entity(ENTITY_URN.urn()) + graph_client.hard_delete_entity(OLD_LATEST_URN.urn()) + + +def test_ingest_version_properties(graph_client: DataHubGraph): + graph_client.emit( + MetadataChangeProposalWrapper( + entityUrn=ENTITY_URN.urn(), + aspect=VersionPropertiesClass( + versionSet=NOT_EXISTS_VERSION_SET_URN.urn(), + version=VersionTagClass(versionTag="first"), + sortId="abc", + versioningScheme=VersioningSchemeClass.LEXICOGRAPHIC_STRING, + ), + ) + ) + version_set_properties = graph_client.get_aspect( + NOT_EXISTS_VERSION_SET_URN.urn(), VersionSetPropertiesClass + ) + assert version_set_properties + assert version_set_properties.latest == ENTITY_URN.urn() + assert ( + version_set_properties.versioningScheme + == VersioningSchemeClass.LEXICOGRAPHIC_STRING + ) + + version_properties = graph_client.get_aspect( + ENTITY_URN.urn(), VersionPropertiesClass + ) + assert version_properties + assert version_properties.isLatest + + +def test_ingest_version_properties_alphanumeric(graph_client: DataHubGraph): + graph_client.emit( + MetadataChangeProposalWrapper( + entityUrn=ENTITY_URN.urn(), + aspect=VersionPropertiesClass( + versionSet=NOT_EXISTS_VERSION_SET_URN.urn(), + version=VersionTagClass(versionTag="first"), + sortId="abc", + versioningScheme=VersioningSchemeClass.ALPHANUMERIC_GENERATED_BY_DATAHUB, + ), + ) + ) + version_properties = graph_client.get_aspect( + ENTITY_URN.urn(), VersionPropertiesClass + ) + assert version_properties + assert version_properties.isLatest + version_set_properties = graph_client.get_aspect( + NOT_EXISTS_VERSION_SET_URN.urn(), VersionSetPropertiesClass + ) + assert version_set_properties + assert version_set_properties.latest == ENTITY_URN.urn() + assert ( + version_set_properties.versioningScheme + == VersioningSchemeClass.ALPHANUMERIC_GENERATED_BY_DATAHUB + ) + + +def test_ingest_version_properties_version_set_new_latest(graph_client: DataHubGraph): + old_latest_version_properties = graph_client.get_aspect( + OLD_LATEST_URN.urn(), VersionPropertiesClass + ) + assert old_latest_version_properties + assert old_latest_version_properties.isLatest + + graph_client.emit( + MetadataChangeProposalWrapper( + entityUrn=ENTITY_URN.urn(), + aspect=VersionPropertiesClass( + versionSet=EXISTS_VERSION_SET_URN.urn(), + version=VersionTagClass(versionTag="second"), + sortId="bbb", + versioningScheme=VersioningSchemeClass.LEXICOGRAPHIC_STRING, + ), + ) + ) + old_latest_version_properties = graph_client.get_aspect( + OLD_LATEST_URN.urn(), VersionPropertiesClass + ) + assert old_latest_version_properties + assert not old_latest_version_properties.isLatest + version_set_properties = graph_client.get_aspect( + EXISTS_VERSION_SET_URN.urn(), VersionSetPropertiesClass + ) + assert version_set_properties + assert version_set_properties.latest == ENTITY_URN.urn() + + new_latest_version_properties = graph_client.get_aspect( + ENTITY_URN.urn(), VersionPropertiesClass + ) + assert new_latest_version_properties + assert new_latest_version_properties.isLatest + + +def test_ingest_version_properties_version_set_not_latest(graph_client: DataHubGraph): + graph_client.emit( + MetadataChangeProposalWrapper( + entityUrn=ENTITY_URN.urn(), + aspect=VersionPropertiesClass( + versionSet=EXISTS_VERSION_SET_URN.urn(), + version=VersionTagClass(versionTag="zero"), + sortId="aaa", + versioningScheme=VersioningSchemeClass.LEXICOGRAPHIC_STRING, + ), + ) + ) + new_latest_version_properties = graph_client.get_aspect( + ENTITY_URN.urn(), VersionPropertiesClass + ) + assert new_latest_version_properties + assert not new_latest_version_properties.isLatest + old_latest_version_properties = graph_client.get_aspect( + OLD_LATEST_URN.urn(), VersionPropertiesClass + ) + assert old_latest_version_properties + assert old_latest_version_properties.isLatest + + version_set_properties = graph_client.get_aspect( + EXISTS_VERSION_SET_URN.urn(), VersionSetPropertiesClass + ) + assert version_set_properties + assert version_set_properties.latest == OLD_LATEST_URN.urn() + + +@pytest.fixture +def ingest_cleanup_data_bulk(graph_client: DataHubGraph): + try: + yield + finally: + for urn in BULK_URNS: + graph_client.hard_delete_entity(urn) + + +def test_ingest_many_versions(graph_client: DataHubGraph, ingest_cleanup_data_bulk): + for i in range(5, 15): + graph_client.emit( + MetadataChangeProposalWrapper( + entityUrn=BULK_URNS[i - 5], + aspect=VersionPropertiesClass( + versionSet=NOT_EXISTS_VERSION_SET_URN.urn(), + version=VersionTagClass(versionTag=f"v{i}"), + sortId=str(i).zfill(2), + versioningScheme=VersioningSchemeClass.LEXICOGRAPHIC_STRING, + ), + ) + ) + expected_latest_urn = BULK_URNS[-1] + expected_latest_version_properties = graph_client.get_aspect( + expected_latest_urn, VersionPropertiesClass + ) + assert expected_latest_version_properties + assert expected_latest_version_properties.isLatest + + version_set_properties = graph_client.get_aspect( + NOT_EXISTS_VERSION_SET_URN.urn(), VersionSetPropertiesClass + ) + assert version_set_properties + assert version_set_properties.latest == expected_latest_urn + assert ( + version_set_properties.versioningScheme + == VersioningSchemeClass.LEXICOGRAPHIC_STRING + ) + wait_for_writes_to_sync() + result = graph_client.execute_graphql( + """ + query getVersions($urn: String!) { + versionSet(urn: $urn) { + versionsSearch(input: {query: "*", count: 20, searchFlags: {skipCache: true}}) { + searchResults { + entity { + urn + } + } + } + } + } + """, + variables={"urn": NOT_EXISTS_VERSION_SET_URN.urn()}, + ) + assert result["versionSet"]["versionsSearch"]["searchResults"] == [ + {"entity": {"urn": urn}} for urn in list(reversed(BULK_URNS)) + ]