Skip to content

Commit

Permalink
Merge branch 'master' into bq-multi-partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
acrylJonny authored Feb 21, 2025
2 parents 25e6123 + 564ba9e commit 358d6d5
Show file tree
Hide file tree
Showing 32 changed files with 2,220 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@
import com.linkedin.datahub.graphql.resolvers.health.EntityHealthResolver;
import com.linkedin.datahub.graphql.resolvers.incident.EntityIncidentsResolver;
import com.linkedin.datahub.graphql.resolvers.incident.RaiseIncidentResolver;
import com.linkedin.datahub.graphql.resolvers.incident.UpdateIncidentResolver;
import com.linkedin.datahub.graphql.resolvers.incident.UpdateIncidentStatusResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CancelIngestionExecutionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CreateIngestionExecutionRequestResolver;
Expand Down Expand Up @@ -1397,6 +1398,9 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher(
"updateIncidentStatus",
new UpdateIncidentStatusResolver(this.entityClient, this.entityService))
.dataFetcher(
"updateIncident",
new UpdateIncidentResolver(this.entityClient, this.entityService))
.dataFetcher(
"createForm", new CreateFormResolver(this.entityClient, this.formService))
.dataFetcher("deleteForm", new DeleteFormResolver(this.entityClient))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityIncidentsResult;
import com.linkedin.datahub.graphql.generated.Incident;
import com.linkedin.datahub.graphql.generated.IncidentPriority;
import com.linkedin.datahub.graphql.types.incident.IncidentMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
Expand Down Expand Up @@ -38,6 +39,9 @@ public class EntityIncidentsResolver
static final String INCIDENT_ENTITIES_SEARCH_INDEX_FIELD_NAME = "entities.keyword";
static final String INCIDENT_STATE_SEARCH_INDEX_FIELD_NAME = "state";
static final String CREATED_TIME_SEARCH_INDEX_FIELD_NAME = "created";
static final String INCIDENT_STAGE_SEARCH_INDEX_FIELD_NAME = "stage";
static final String INCIDENT_PRIORITY_SEARCH_INDEX_FIELD_NAME = "priority";
static final String INCIDENT_ASSIGNEES_SEARCH_INDEX_FIELD_NAME = "assignees";

private final EntityClient _entityClient;

Expand All @@ -55,12 +59,18 @@ public CompletableFuture<EntityIncidentsResult> get(DataFetchingEnvironment envi
final Integer start = environment.getArgumentOrDefault("start", 0);
final Integer count = environment.getArgumentOrDefault("count", 20);
final Optional<String> maybeState = Optional.ofNullable(environment.getArgument("state"));

final Optional<String> maybeStage = Optional.ofNullable(environment.getArgument("stage"));
final Optional<String> maybePriority =
Optional.ofNullable(environment.getArgument("priority"));
final Optional<List<String>> maybeAssigneeUrns =
Optional.ofNullable(environment.getArgument("assigneeUrns"));
try {
// Step 1: Fetch set of incidents associated with the target entity from the Search
// Index!
// We use the search index so that we can easily sort by the last updated time.
final Filter filter = buildIncidentsEntityFilter(entityUrn, maybeState);
final Filter filter =
buildIncidentsFilter(
entityUrn, maybeState, maybeStage, maybePriority, maybeAssigneeUrns);
final List<SortCriterion> sortCriteria = buildIncidentsSortCriteria();
final SearchResult searchResult =
_entityClient.filter(
Expand Down Expand Up @@ -110,13 +120,33 @@ public CompletableFuture<EntityIncidentsResult> get(DataFetchingEnvironment envi
"get");
}

private Filter buildIncidentsEntityFilter(
final String entityUrn, final Optional<String> maybeState) {
final Map<String, String> criterionMap = new HashMap<>();
criterionMap.put(INCIDENT_ENTITIES_SEARCH_INDEX_FIELD_NAME, entityUrn);
private Filter buildIncidentsFilter(
final String entityUrn,
final Optional<String> maybeState,
final Optional<String> maybeStage,
final Optional<String> maybePriority,
final Optional<List<String>> maybeAssigneeUrns) {
final Map<String, List<String>> criterionMap = new HashMap<>();
criterionMap.put(
INCIDENT_ENTITIES_SEARCH_INDEX_FIELD_NAME, Collections.singletonList(entityUrn));
maybeState.ifPresent(
incidentState -> criterionMap.put(INCIDENT_STATE_SEARCH_INDEX_FIELD_NAME, incidentState));
return QueryUtils.newFilter(criterionMap);
incidentState ->
criterionMap.put(
INCIDENT_STATE_SEARCH_INDEX_FIELD_NAME, Collections.singletonList(incidentState)));
maybeStage.ifPresent(
incidentStage ->
criterionMap.put(
INCIDENT_STAGE_SEARCH_INDEX_FIELD_NAME, Collections.singletonList(incidentStage)));
maybePriority.ifPresent(
incidentPriority ->
criterionMap.put(
INCIDENT_PRIORITY_SEARCH_INDEX_FIELD_NAME,
Collections.singletonList(
IncidentUtils.mapIncidentPriority(IncidentPriority.valueOf(incidentPriority))
.toString())));
maybeAssigneeUrns.ifPresent(
assigneeUrns -> criterionMap.put(INCIDENT_ASSIGNEES_SEARCH_INDEX_FIELD_NAME, assigneeUrns));
return QueryUtils.newListsFilter(criterionMap);
}

private List<SortCriterion> buildIncidentsSortCriteria() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,41 @@
import com.datahub.authorization.ConjunctivePrivilegeGroup;
import com.datahub.authorization.DisjunctivePrivilegeGroup;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.generated.IncidentPriority;
import com.linkedin.incident.IncidentAssignee;
import com.linkedin.incident.IncidentAssigneeArray;
import com.linkedin.incident.IncidentStage;
import com.linkedin.incident.IncidentState;
import com.linkedin.incident.IncidentStatus;
import com.linkedin.metadata.authorization.PoliciesConfig;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class IncidentUtils {

public static List<Urn> stringsToUrns(List<String> urns) {
return urns.stream()
.map(
rawUrn -> {
try {
return Urn.createFromString(rawUrn);
} catch (Exception e) {
return null;
}
})
.filter(Objects::nonNull)
.distinct()
.toList();
}

public static boolean isAuthorizedToEditIncidentForResource(
final Urn resourceUrn, final QueryContext context) {
final DisjunctivePrivilegeGroup orPrivilegeGroups =
Expand All @@ -22,4 +50,61 @@ public static boolean isAuthorizedToEditIncidentForResource(
return AuthorizationUtils.isAuthorized(
context, resourceUrn.getEntityType(), resourceUrn.toString(), orPrivilegeGroups);
}

@Nullable
public static Integer mapIncidentPriority(@Nullable final IncidentPriority priority) {
if (priority == null) {
return null;
}
switch (priority) {
case LOW:
return 3;
case MEDIUM:
return 2;
case HIGH:
return 1;
case CRITICAL:
return 0;
default:
throw new IllegalArgumentException("Invalid incident priority: " + priority);
}
}

@Nullable
public static IncidentAssigneeArray mapIncidentAssignees(
@Nullable final List<String> assignees, @Nonnull final AuditStamp auditStamp) {
if (assignees == null) {
return null;
}
return new IncidentAssigneeArray(
assignees.stream()
.map(assignee -> createAssignee(assignee, auditStamp))
.collect(Collectors.toList()));
}

@Nonnull
public static IncidentStatus mapIncidentStatus(
@Nullable final com.linkedin.datahub.graphql.generated.IncidentStatusInput input,
@Nonnull final AuditStamp auditStamp) {
if (input == null) {
return new IncidentStatus().setState(IncidentState.ACTIVE).setLastUpdated(auditStamp);
}

IncidentStatus status = new IncidentStatus();
status.setState(IncidentState.valueOf(input.getState().toString()));
if (input.getStage() != null) {
status.setStage(IncidentStage.valueOf(input.getStage().toString()));
}
if (input.getMessage() != null) {
status.setMessage(input.getMessage());
}
return status;
}

private static IncidentAssignee createAssignee(
@Nonnull final String assigneeUrn, @Nonnull final AuditStamp auditStamp) {
return new IncidentAssignee().setActor(UrnUtils.getUrn(assigneeUrn)).setAssignedAt(auditStamp);
}

private IncidentUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.datahub.graphql.authorization.AuthorizationUtils.ALL_PRIVILEGES_GROUP;
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.datahub.graphql.resolvers.incident.IncidentUtils.*;
import static com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils.*;
import static com.linkedin.metadata.Constants.*;

Expand All @@ -21,21 +22,23 @@
import com.linkedin.incident.IncidentInfo;
import com.linkedin.incident.IncidentSource;
import com.linkedin.incident.IncidentSourceType;
import com.linkedin.incident.IncidentState;
import com.linkedin.incident.IncidentStatus;
import com.linkedin.incident.IncidentType;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.key.IncidentKey;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/** Resolver used for creating (raising) a new asset incident. */
// TODO: Add an incident impact summary that is computed here (or in a hook)
@Slf4j
@RequiredArgsConstructor
public class RaiseIncidentResolver implements DataFetcher<CompletableFuture<String>> {
Expand All @@ -48,13 +51,27 @@ public CompletableFuture<String> get(DataFetchingEnvironment environment) throws
final QueryContext context = environment.getContext();
final RaiseIncidentInput input =
bindArgument(environment.getArgument("input"), RaiseIncidentInput.class);
final Urn resourceUrn = Urn.createFromString(input.getResourceUrn());
final Urn resourceUrn =
input.getResourceUrn() != null ? Urn.createFromString(input.getResourceUrn()) : null;
final List<Urn> resourceUrns =
new ArrayList<>(
input.getResourceUrns() != null
? stringsToUrns(input.getResourceUrns())
: Collections.emptyList());
if (resourceUrn != null && !resourceUrns.contains(resourceUrn)) {
resourceUrns.add(resourceUrn);
}
if (resourceUrns.isEmpty()) {
throw new RuntimeException("At least 1 resource urn must be defined to raise an incident.");
}

return GraphQLConcurrencyUtils.supplyAsync(
() -> {
if (!isAuthorizedToCreateIncidentForResource(resourceUrn, context)) {
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
for (Urn urn : resourceUrns) {
if (!isAuthorizedToCreateIncidentForResource(urn, context)) {
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
}
}

try {
Expand All @@ -71,45 +88,53 @@ public CompletableFuture<String> get(DataFetchingEnvironment environment) throws
key,
INCIDENT_ENTITY_NAME,
INCIDENT_INFO_ASPECT_NAME,
mapIncidentInfo(input, context));
mapIncidentInfo(input, resourceUrns, context));
return _entityClient.ingestProposal(context.getOperationContext(), proposal, false);
} catch (Exception e) {
log.error("Failed to create incident. {}", e.getMessage());
throw new RuntimeException("Failed to incident", e);
throw new RuntimeException(e.getMessage());
}
},
this.getClass().getSimpleName(),
"get");
}

private IncidentInfo mapIncidentInfo(final RaiseIncidentInput input, final QueryContext context)
private IncidentInfo mapIncidentInfo(
final RaiseIncidentInput input, List<Urn> resourceUrns, final QueryContext context)
throws URISyntaxException {
final AuditStamp actorStamp =
new AuditStamp()
.setActor(Urn.createFromString(context.getActorUrn()))
.setTime(System.currentTimeMillis());
final IncidentInfo result = new IncidentInfo();
result.setType(
IncidentType.valueOf(
input
.getType()
.name())); // Assumption Alert: This assumes that GMS incident type === GraphQL
// incident type.
if (IncidentType.CUSTOM.name().equals(input.getType().name())
&& input.getCustomType() == null) {
throw new URISyntaxException("Failed to create incident.", "customType is required");
}
result.setCustomType(input.getCustomType(), SetMode.IGNORE_NULL);
result.setTitle(input.getTitle(), SetMode.IGNORE_NULL);
result.setDescription(input.getDescription(), SetMode.IGNORE_NULL);
result.setEntities(
new UrnArray(ImmutableList.of(Urn.createFromString(input.getResourceUrn()))));
result.setEntities(new UrnArray(resourceUrns));
result.setCreated(
new AuditStamp()
.setActor(Urn.createFromString(context.getActorUrn()))
.setTime(System.currentTimeMillis()));
if (input.getStartedAt() != null) {
result.setStartedAt(input.getStartedAt());
}
// Create the incident in the 'active' state by default.
result.setStatus(
new IncidentStatus()
.setState(IncidentState.ACTIVE)
.setLastUpdated(
new AuditStamp()
.setActor(Urn.createFromString(context.getActorUrn()))
.setTime(System.currentTimeMillis())));
result.setSource(new IncidentSource().setType(IncidentSourceType.MANUAL), SetMode.IGNORE_NULL);
result.setPriority(input.getPriority(), SetMode.IGNORE_NULL);
result.setPriority(IncidentUtils.mapIncidentPriority(input.getPriority()), SetMode.IGNORE_NULL);
result.setAssignees(
IncidentUtils.mapIncidentAssignees(input.getAssigneeUrns(), actorStamp),
SetMode.IGNORE_NULL);
result.setStatus(IncidentUtils.mapIncidentStatus(input.getStatus(), actorStamp));
return result;
}

Expand Down
Loading

0 comments on commit 358d6d5

Please sign in to comment.