Skip to content

Commit

Permalink
(feat) Add deleteWithReturn
Browse files Browse the repository at this point in the history
  • Loading branch information
jphui committed Feb 12, 2025
1 parent 2de35c0 commit 2f4a4fd
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 15 deletions.
49 changes: 36 additions & 13 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.linkedin.metadata.query.IndexSortCriterion;
import java.sql.Timestamp;
import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -659,7 +660,10 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> aspectUpdateHelper(URN

private <ASPECT extends RecordTemplate> ASPECT_UNION unwrapAddResultToUnion(URN urn, AddResult<ASPECT> result,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) {
// handle post-update hooks and emit MAE + return the newValue
ASPECT rawResult = unwrapAddResult(urn, result, auditStamp, trackingContext);

// package it into a union
return ModelUtils.newEntityUnion(_aspectUnionClass, rawResult);
}

Expand Down Expand Up @@ -817,53 +821,58 @@ public <ASPECT extends RecordTemplate> ASPECT add(@Nonnull URN urn, AspectUpdate
* <p>The new aspect will have an automatically assigned version number, which is guaranteed to be positive and
* monotonically increasing. Older versions of aspect will be purged automatically based on the retention setting.
*
* <p>Note that we do not support Post-update hooks while soft deleting an aspect
* <p>Note that we do not currently support pre- or post- update hooks while soft deleting an aspect.
*
* @param urn urn the URN for the entity the aspects are attached to
* @param aspectClasses Aspect Classes of the aspects being deleted, must be supported aspect types in {@code ASPECT_UNION}
* Because Aspect Classes must be unique for a given Entity, we use a set to avoid duplicates.
* @param auditStamp the audit stamp of this action
* @param maxTransactionRetry maximum number of transaction retries before throwing an exception
* @return a collection of the deleted aspects (their value before deletion), each wrapped in an instance of {@link ASPECT_UNION}
*/
public void deleteMany(@Nonnull URN urn,
public Collection<ASPECT_UNION> deleteMany(@Nonnull URN urn,
@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
@Nonnull AuditStamp auditStamp,
int maxTransactionRetry) {
deleteMany(urn, aspectClasses, auditStamp, maxTransactionRetry, null);
return deleteMany(urn, aspectClasses, auditStamp, maxTransactionRetry, null);
}

/**
* Similar to {@link #deleteMany(Urn, Set, AuditStamp, int)} but uses the default maximum transaction retry.
*/
@Nonnull
public void deleteMany(@Nonnull URN urn, @Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
public Collection<ASPECT_UNION> deleteMany(
@Nonnull URN urn, @Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
@Nonnull AuditStamp auditStamp) {
deleteMany(urn, aspectClasses, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY);
return deleteMany(urn, aspectClasses, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY);
}

/**
* Same as above {@link #deleteMany(Urn, Set, AuditStamp)} but with tracking context.
*/
@Nonnull
public void deleteMany(@Nonnull URN urn, @Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
public Collection<ASPECT_UNION> deleteMany(
@Nonnull URN urn, @Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) {
deleteMany(urn, aspectClasses, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext);
return deleteMany(urn, aspectClasses, auditStamp, DEFAULT_MAX_TRANSACTION_RETRY, trackingContext);
}

/**
* Same as {@link #deleteMany(Urn, Set, AuditStamp, int)} but with tracking context.
*/
public void deleteMany(@Nonnull URN urn,
public Collection<ASPECT_UNION> deleteMany(@Nonnull URN urn,
@Nonnull Set<Class<? extends RecordTemplate>> aspectClasses,
@Nonnull AuditStamp auditStamp,
int maxTransactionRetry,
@Nullable IngestionTrackingContext trackingContext) {

// entire delete operation should be atomic
runInTransactionWithRetry(() -> {
aspectClasses.forEach(x -> delete(urn, x, auditStamp, maxTransactionRetry, trackingContext));
return null;
}, maxTransactionRetry);
final Collection<RecordTemplate> results = runInTransactionWithRetry(() -> aspectClasses.stream()
.map(x -> deleteWithReturn(urn, x, auditStamp, maxTransactionRetry, trackingContext))
.collect(Collectors.toList()), maxTransactionRetry);

// package into ASPECT_UNION, this is logic performed in unwrapAddResultToUnion()
return results.stream().map(x -> ModelUtils.newEntityUnion(_aspectUnionClass, x)).collect(Collectors.toList());
}

/**
Expand All @@ -890,16 +899,30 @@ public <ASPECT extends RecordTemplate> void delete(@Nonnull URN urn, @Nonnull Cl
*/
public <ASPECT extends RecordTemplate> void delete(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext) {
deleteWithReturn(urn, aspectClass, auditStamp, maxTransactionRetry, trackingContext);
}

/**
* Deletes the latest version of an aspect for an entity and returns the ***old value***.
*/
public <ASPECT extends RecordTemplate> ASPECT deleteWithReturn(@Nonnull URN urn, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, int maxTransactionRetry, @Nullable IngestionTrackingContext trackingContext) {

checkValidAspect(aspectClass);

runInTransactionWithRetry(() -> {
final AddResult<ASPECT> result = runInTransactionWithRetry(() -> {
final AspectEntry<ASPECT> latest = getLatest(urn, aspectClass, false);
final IngestionParams ingestionParams = new IngestionParams().setIngestionMode(IngestionMode.LIVE);
return addCommon(urn, latest, null, aspectClass, auditStamp, new DefaultEqualityTester<>(), trackingContext, ingestionParams);
}, maxTransactionRetry);

return result.getOldValue();

// TODO: add support for sending MAE for soft deleted aspects
// FY25H2 Note: When performing an Aspect UPDATE, unwrapAddResultToUnion() is called, which emits MAE and does post-update hooks.
// When doing similar for DELETE, we should end up doing something similar, but specific to deletion.
// We *could* modify the existing unwrapAddResultToUnion() to account for both cases and just reuse it completely,
// but this might be confusing, so it might be best to make a deletion-specific version of that method.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -2633,8 +2634,38 @@ public void testRemoveRelationshipsDuringAspectSoftDeletion() throws URISyntaxEx
public void testDeleteManyWithRelationshipRemoval() throws URISyntaxException {
FooUrn fooUrn = makeFooUrn(1);
EbeanLocalDAO<EntityAspectUnion, FooUrn> fooDao = createDao(FooUrn.class);
// necessary flag to prevent removal of existing same-type relationships in "another aspect"
fooDao.setUseAspectColumnForRelationshipRemoval(true);

setupAspectsAndRelationships(fooUrn, fooDao);
EbeanLocalDAO<EntityAspectUnion, BarUrn> barDao = createDao(BarUrn.class);

// add an aspect (AspectFooBar) which includes BelongsTo relationships and ReportsTo relationships
BarUrn barUrn1 = BarUrn.createFromString("urn:li:bar:1");
BelongsToV2 belongsTo1 = new BelongsToV2().setDestination(BelongsToV2.Destination.create(barUrn1.toString()));
BarUrn barUrn2 = BarUrn.createFromString("urn:li:bar:2");
BelongsToV2 belongsTo2 = new BelongsToV2().setDestination(BelongsToV2.Destination.create(barUrn2.toString()));
BarUrn barUrn3 = BarUrn.createFromString("urn:li:bar:3");
BelongsToV2 belongsTo3 = new BelongsToV2().setDestination(BelongsToV2.Destination.create(barUrn3.toString()));
BelongsToV2Array belongsToArray = new BelongsToV2Array(belongsTo1, belongsTo2, belongsTo3);
ReportsTo reportsTo = new ReportsTo().setSource(fooUrn).setDestination(barUrn1);
ReportsToArray reportsToArray = new ReportsToArray(reportsTo);
AspectFooBar aspectFooBar = new AspectFooBar()
.setBars(new BarUrnArray(barUrn1, barUrn2, barUrn3)).setBelongsTos(belongsToArray).setReportsTos(reportsToArray);
AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis());

fooDao.add(fooUrn, aspectFooBar, auditStamp);
barDao.add(barUrn1, new AspectFoo().setValue("1"), auditStamp);
barDao.add(barUrn2, new AspectFoo().setValue("2"), auditStamp);
barDao.add(barUrn3, new AspectFoo().setValue("3"), auditStamp);

// add an aspect (AspectFooBaz) which includes BelongsTo relationships
BarUrn barUrn4 = BarUrn.createFromString("urn:li:bar:4");
BelongsToV2 belongsTo4 = new BelongsToV2().setDestination(BelongsToV2.Destination.create(barUrn4.toString()));
BelongsToV2Array belongsToArray2 = new BelongsToV2Array(belongsTo4);
AspectFooBaz aspectFooBaz = new AspectFooBaz().setBars(new BarUrnArray(barUrn4)).setBelongsTos(belongsToArray2);

fooDao.add(fooUrn, aspectFooBaz, auditStamp);
barDao.add(barUrn4, new AspectFoo().setValue("4"), auditStamp);

// Verify local relationships and entities are added.
EbeanLocalRelationshipQueryDAO ebeanLocalRelationshipQueryDAO = new EbeanLocalRelationshipQueryDAO(_server);
Expand All @@ -2658,7 +2689,24 @@ public void testDeleteManyWithRelationshipRemoval() throws URISyntaxException {
assertEquals(aspects.size(), 1);

// soft delete the AspectFooBar and AspectFooBaz aspects
fooDao.deleteMany(fooUrn, new HashSet<>(Arrays.asList(AspectFooBar.class, AspectFooBaz.class)), _dummyAuditStamp);
Collection<EntityAspectUnion> deletedAspects =
fooDao.deleteMany(fooUrn, new HashSet<>(Arrays.asList(AspectFooBar.class, AspectFooBaz.class)), _dummyAuditStamp);

assertEquals(deletedAspects.size(), 2);

// check that the AspectFooBar content returned matches the pre-deletion content
Optional<EntityAspectUnion> aspectFooBarDeleted = deletedAspects.stream()
.filter(aspect -> aspect.getAspectFooBar() != null)
.findFirst();
assertTrue(aspectFooBarDeleted.isPresent());
assertEquals(aspectFooBarDeleted.get().getAspectFooBar(), aspectFooBar);

// check that the AspectFooBaz content returned matches the pre-deletion content
Optional<EntityAspectUnion> aspectFooBazDeleted = deletedAspects.stream()
.filter(aspect -> aspect.getAspectFooBaz() != null)
.findFirst();
assertTrue(aspectFooBazDeleted.isPresent());
assertEquals(aspectFooBazDeleted.get().getAspectFooBaz(), aspectFooBaz);

// check that the belongsTo relationships 1, 2, 3, and 4 were soft deleted
resultBelongsTos = ebeanLocalRelationshipQueryDAO.findRelationships(FooSnapshot.class, EMPTY_FILTER, BarSnapshot.class,
Expand All @@ -2682,6 +2730,45 @@ public void testDeleteManyWithRelationshipRemoval() throws URISyntaxException {
assertFalse(optionalAspect2.isPresent());
}

@Test
public void testDeleteWithReturnOnNonexistentAsset() {
EbeanLocalDAO<EntityAspectUnion, FooUrn> dao = createDao(FooUrn.class);
FooUrn urn = makeFooUrn(1);

AspectFoo foo = dao.deleteWithReturn(urn, AspectFoo.class, _dummyAuditStamp, 3, null);
assertNull(foo);
}

@Test
public void testDeleteWithReturnOnNullAspect() {
EbeanLocalDAO<EntityAspectUnion, FooUrn> dao = createDao(FooUrn.class);
FooUrn urn = makeFooUrn(1);

// add aspect so the row exists in the entity table, but the column for other aspects will be empty
AspectFoo v0 = new AspectFoo().setValue("foo");
dao.add(urn, v0, _dummyAuditStamp);

// attempt to delete an aspect that doesn't exist
AspectBaz foo = dao.deleteWithReturn(urn, AspectBaz.class, _dummyAuditStamp, 3, null);
assertNull(foo);
}

@Test
public void testDeleteWithReturnOnAlreadyDeletedAspect() {
EbeanLocalDAO<EntityAspectUnion, FooUrn> dao = createDao(FooUrn.class);
FooUrn urn = makeFooUrn(1);
AspectFoo v0 = new AspectFoo().setValue("foo");
dao.add(urn, v0, _dummyAuditStamp);
AspectFoo foo = dao.deleteWithReturn(urn, AspectFoo.class, _dummyAuditStamp, 3, null);

// make sure that the content matches the original
assertEquals(foo, v0);

// attempt to delete an aspect that has already been deleted
AspectFoo fooAgain = dao.deleteWithReturn(urn, AspectFoo.class, _dummyAuditStamp, 3, null);
assertNull(fooAgain);
}

@Test
public void testGetWithExtraInfoMultipleKeys() {
EbeanLocalDAO<EntityAspectUnion, FooUrn> dao = createDao(FooUrn.class);
Expand Down

0 comments on commit 2f4a4fd

Please sign in to comment.