Skip to content

Commit

Permalink
fix(lineage) Support views and sorting in impact analysis (#12769)
Browse files Browse the repository at this point in the history
  • Loading branch information
chriscollins3456 authored Mar 3, 2025
1 parent 3f6b8c1 commit 17de393
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,8 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
new ScrollAcrossEntitiesResolver(this.entityClient, this.viewService))
.dataFetcher(
"searchAcrossLineage",
new SearchAcrossLineageResolver(this.entityClient, this.entityRegistry))
new SearchAcrossLineageResolver(
this.entityClient, this.entityRegistry, this.viewService))
.dataFetcher(
"scrollAcrossLineage", new ScrollAcrossLineageResolver(this.entityClient))
.dataFetcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.AndFilterInput;
Expand All @@ -25,8 +26,12 @@
import com.linkedin.metadata.query.LineageFlags;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.search.LineageSearchResult;
import com.linkedin.metadata.service.ViewService;
import com.linkedin.metadata.utils.elasticsearch.FilterUtils;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.view.DataHubViewInfo;
import graphql.VisibleForTesting;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
Expand All @@ -53,10 +58,13 @@ public class SearchAcrossLineageResolver

private final EntityRegistry _entityRegistry;

private final ViewService _viewService;

@VisibleForTesting final Set<String> _allEntities;
private final List<String> _allowedEntities;

public SearchAcrossLineageResolver(EntityClient entityClient, EntityRegistry entityRegistry) {
public SearchAcrossLineageResolver(
EntityClient entityClient, EntityRegistry entityRegistry, final ViewService viewService) {
this._entityClient = entityClient;
this._entityRegistry = entityRegistry;
this._allEntities =
Expand All @@ -68,6 +76,8 @@ public SearchAcrossLineageResolver(EntityClient entityClient, EntityRegistry ent
this._allEntities.stream()
.filter(e -> !TRANSIENT_ENTITIES.contains(e))
.collect(Collectors.toList());

this._viewService = viewService;
}

private List<String> getEntityNamesFromInput(List<EntityType> inputTypes) {
Expand Down Expand Up @@ -127,6 +137,13 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
com.linkedin.metadata.graph.LineageDirection.valueOf(lineageDirection.toString());
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
final DataHubViewInfo maybeResolvedView =
(input.getViewUrn() != null)
? resolveView(
context.getOperationContext(),
_viewService,
UrnUtils.getUrn(input.getViewUrn()))
: null;
try {
log.debug(
"Executing search across relationships: source urn {}, direction {}, entity types {}, query {}, filters: {}, start: {}, count: {}",
Expand All @@ -138,8 +155,13 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
start,
count);

final Filter filter =
final Filter baseFilter =
ResolverUtils.buildFilter(input.getFilters(), input.getOrFilters());
Filter filter =
maybeResolvedView != null
? FilterUtils.combineFilters(
baseFilter, maybeResolvedView.getDefinition().getFilter())
: baseFilter;
final SearchFlags searchFlags;
com.linkedin.datahub.graphql.generated.SearchFlags inputFlags = input.getSearchFlags();
if (inputFlags != null) {
Expand All @@ -150,6 +172,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
} else {
searchFlags = new SearchFlags().setFulltext(true).setSkipHighlighting(true);
}
List<SortCriterion> sortCriteria = SearchUtils.getSortCriteria(input.getSortInput());
LineageSearchResult salResults =
_entityClient.searchAcrossLineage(
context
Expand All @@ -162,7 +185,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
sanitizedQuery,
maxHops,
filter,
null,
sortCriteria,
start,
count);

Expand Down
10 changes: 10 additions & 0 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,16 @@ input SearchAcrossLineageInput {
Flags controlling the lineage query
"""
lineageFlags: LineageFlags

"""
Optional - A View to apply when generating results
"""
viewUrn: String

"""
Optional - Information on how to sort this search result
"""
sortInput: SearchSortInput
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.linkedin.metadata.search.LineageSearchEntityArray;
import com.linkedin.metadata.search.MatchedFieldArray;
import com.linkedin.metadata.search.SearchResultMetadata;
import com.linkedin.metadata.service.ViewService;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.io.InputStream;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void testAllEntitiesInitialization() {
InputStream inputStream = ClassLoader.getSystemResourceAsStream("entity-registry.yml");
EntityRegistry entityRegistry = new ConfigEntityRegistry(inputStream);
SearchAcrossLineageResolver resolver =
new SearchAcrossLineageResolver(_entityClient, entityRegistry);
new SearchAcrossLineageResolver(_entityClient, entityRegistry, mock(ViewService.class));
assertTrue(resolver._allEntities.contains("dataset"));
assertTrue(resolver._allEntities.contains("dataFlow"));
// Test for case sensitivity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linkedin.metadata.search.LineageSearchResult;
import com.linkedin.metadata.search.MatchedFieldArray;
import com.linkedin.metadata.search.SearchResultMetadata;
import com.linkedin.metadata.service.ViewService;
import graphql.schema.DataFetchingEnvironment;
import io.datahubproject.metadata.context.OperationContext;
import java.io.InputStream;
Expand All @@ -48,6 +49,7 @@ public class SearchAcrossLineageResolverTest {
private SearchAcrossLineageResolver _resolver;

private EntityRegistry _entityRegistry;
private ViewService _viewService;

@BeforeMethod
public void setupTest() {
Expand All @@ -56,15 +58,16 @@ public void setupTest() {
_authentication = mock(Authentication.class);

_entityRegistry = mock(EntityRegistry.class);
_resolver = new SearchAcrossLineageResolver(_entityClient, _entityRegistry);
_viewService = mock(ViewService.class);
_resolver = new SearchAcrossLineageResolver(_entityClient, _entityRegistry, _viewService);
}

@Test
public void testAllEntitiesInitialization() {
InputStream inputStream = ClassLoader.getSystemResourceAsStream("entity-registry.yml");
EntityRegistry entityRegistry = new ConfigEntityRegistry(inputStream);
SearchAcrossLineageResolver resolver =
new SearchAcrossLineageResolver(_entityClient, entityRegistry);
new SearchAcrossLineageResolver(_entityClient, entityRegistry, _viewService);
assertTrue(resolver._allEntities.contains("dataset"));
assertTrue(resolver._allEntities.contains("dataFlow"));
// Test for case sensitivity
Expand Down Expand Up @@ -115,7 +118,7 @@ public void testSearchAcrossLineage() throws Exception {
eq(QUERY),
eq(null),
any(),
eq(null),
eq(Collections.emptyList()),
eq(START),
eq(COUNT)))
.thenReturn(lineageSearchResult);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package com.linkedin.metadata.utils.elasticsearch;

import com.google.common.collect.ImmutableList;
import com.linkedin.data.template.StringArray;
import com.linkedin.metadata.query.filter.Condition;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.utils.CriterionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class FilterUtils {

/**
* Combines two {@link Filter} instances in a conjunction and returns a new instance of {@link
* Filter} in disjunctive normal form.
*
* @param baseFilter the filter to apply the view to
* @param viewFilter the view filter, null if it doesn't exist
* @return a new instance of {@link Filter} representing the applied view.
*/
@Nonnull
public static Filter combineFilters(
@Nullable final Filter baseFilter, @Nonnull final Filter viewFilter) {
final Filter finalBaseFilter =
baseFilter == null
? new Filter().setOr(new ConjunctiveCriterionArray(Collections.emptyList()))
: baseFilter;

// Join the filter conditions in Disjunctive Normal Form.
return combineFiltersInConjunction(finalBaseFilter, viewFilter);
}

/**
* Joins two filters in conjunction by reducing to Disjunctive Normal Form.
*
* @param filter1 the first filter in the pair
* @param filter2 the second filter in the pair
* <p>This method supports either Filter format, where the "or" field is used, instead of
* criteria. If the criteria filter is used, then it will be converted into an "OR" before
* returning the new filter.
* @return the result of joining the 2 filters in a conjunction (AND)
* <p>How does it work? It basically cross-products the conjunctions inside of each Filter
* clause.
* <p>Example Inputs: filter1 -> { or: [ { and: [ { field: tags, condition: EQUAL, values:
* ["urn:li:tag:tag"] } ] }, { and: [ { field: glossaryTerms, condition: EQUAL, values:
* ["urn:li:glossaryTerm:term"] } ] } ] } filter2 -> { or: [ { and: [ { field: domain,
* condition: EQUAL, values: ["urn:li:domain:domain"] }, ] }, { and: [ { field: glossaryTerms,
* condition: EQUAL, values: ["urn:li:glossaryTerm:term2"] } ] } ] } Example Output: { or: [ {
* and: [ { field: tags, condition: EQUAL, values: ["urn:li:tag:tag"] }, { field: domain,
* condition: EQUAL, values: ["urn:li:domain:domain"] } ] }, { and: [ { field: tags,
* condition: EQUAL, values: ["urn:li:tag:tag"] }, { field: glossaryTerms, condition: EQUAL,
* values: ["urn:li:glosaryTerm:term2"] } ] }, { and: [ { field: glossaryTerm, condition:
* EQUAL, values: ["urn:li:glossaryTerm:term"] }, { field: domain, condition: EQUAL, values:
* ["urn:li:domain:domain"] } ] }, { and: [ { field: glossaryTerm, condition: EQUAL, values:
* ["urn:li:glossaryTerm:term"] }, { field: glossaryTerms, condition: EQUAL, values:
* ["urn:li:glosaryTerm:term2"] } ] }, ] }
*/
@Nonnull
private static Filter combineFiltersInConjunction(
@Nonnull final Filter filter1, @Nonnull final Filter filter2) {

final Filter finalFilter1 = convertToV2Filter(filter1);
final Filter finalFilter2 = convertToV2Filter(filter2);

// If either filter is empty, simply return the other filter.
if (!finalFilter1.hasOr() || finalFilter1.getOr().size() == 0) {
return finalFilter2;
}
if (!finalFilter2.hasOr() || finalFilter2.getOr().size() == 0) {
return finalFilter1;
}

// Iterate through the base filter, then cross-product with filter 2 conditions.
final Filter result = new Filter();
final List<ConjunctiveCriterion> newDisjunction = new ArrayList<>();
for (ConjunctiveCriterion conjunction1 : finalFilter1.getOr()) {
for (ConjunctiveCriterion conjunction2 : finalFilter2.getOr()) {
final List<Criterion> joinedCriterion = new ArrayList<>(conjunction1.getAnd());
joinedCriterion.addAll(conjunction2.getAnd());
ConjunctiveCriterion newConjunction =
new ConjunctiveCriterion().setAnd(new CriterionArray(joinedCriterion));
newDisjunction.add(newConjunction);
}
}
result.setOr(new ConjunctiveCriterionArray(newDisjunction));
return result;
}

@Nonnull
private static Filter convertToV2Filter(@Nonnull Filter filter) {
if (filter.hasOr()) {
return filter;
} else if (filter.hasCriteria()) {
// Convert criteria to an OR
return new Filter()
.setOr(
new ConjunctiveCriterionArray(
ImmutableList.of(new ConjunctiveCriterion().setAnd(filter.getCriteria()))));
}
throw new IllegalArgumentException(
String.format(
"Illegal filter provided! Neither 'or' nor 'criteria' fields were populated for filter %s",
filter));
}

@Nonnull
public static Filter createValuesFilter(
@Nonnull final String fieldName, @Nonnull final List<String> values) {
Filter filter = new Filter();
CriterionArray criterionArray = new CriterionArray();

StringArray valuesArray = new StringArray();
valuesArray.addAll(values);
Criterion criterion = CriterionUtils.buildCriterion(fieldName, Condition.EQUAL, valuesArray);

criterionArray.add(criterion);
filter.setOr(
new ConjunctiveCriterionArray(
ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray))));

return filter;
}
}
Loading

0 comments on commit 17de393

Please sign in to comment.