Skip to content

Commit

Permalink
feat(operations): ES and Kafka Operations Endpoints (#12756)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Mar 3, 2025
1 parent 12eb0cd commit 3dac08d
Show file tree
Hide file tree
Showing 13 changed files with 2,325 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Getter;
Expand Down Expand Up @@ -401,6 +403,21 @@ public Optional<SearchResponse> raw(
return esSearchDAO.raw(opContext, indexName, jsonQuery);
}

@Override
@Nonnull
public Map<Urn, Map<String, Object>> raw(
@Nonnull OperationContext opContext, @Nonnull Set<Urn> urns) {
return esSearchDAO.rawEntity(opContext, urns).entrySet().stream()
.flatMap(
entry ->
Optional.ofNullable(entry.getValue().getHits().getHits())
.filter(hits -> hits.length > 0)
.map(hits -> Map.entry(entry.getKey(), hits[0]))
.stream())
.map(entry -> Map.entry(entry.getKey(), entry.getValue().getSourceAsMap()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public int maxResultSize() {
return ESUtils.MAX_RESULT_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.aspect.patch.template.TemplateUtil.*;
import static com.linkedin.metadata.timeseries.elastic.indexbuilder.MappingsBuilder.URN_FIELD;
import static com.linkedin.metadata.utils.SearchUtil.*;

import com.datahub.util.exception.ESQueryException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.LongMap;
import com.linkedin.metadata.config.search.SearchConfiguration;
import com.linkedin.metadata.config.search.custom.CustomSearchConfiguration;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.AutoCompleteResult;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
Expand All @@ -37,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
Expand All @@ -58,6 +62,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;

Expand Down Expand Up @@ -615,6 +620,43 @@ public Optional<SearchResponse> raw(
});
}

public Map<Urn, SearchResponse> rawEntity(@Nonnull OperationContext opContext, Set<Urn> urns) {
EntityRegistry entityRegistry = opContext.getEntityRegistry();
Map<Urn, EntitySpec> specs =
urns.stream()
.flatMap(
urn ->
Optional.ofNullable(entityRegistry.getEntitySpec(urn.getEntityType()))
.map(spec -> Map.entry(urn, spec))
.stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return specs.entrySet().stream()
.map(
entry -> {
try {
String indexName =
opContext
.getSearchContext()
.getIndexConvention()
.getIndexName(entry.getValue());

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.termQuery(URN_FIELD, entry.getKey().toString()));

SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);

return Map.entry(
entry.getKey(), client.search(searchRequest, RequestOptions.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private boolean supportsPointInTime() {
return pointInTimeCreationEnabled
&& ELASTICSEARCH_IMPLEMENTATION_ELASTICSEARCH.equalsIgnoreCase(elasticSearchImplementation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.datahubproject.openapi.v1.models.TraceStorageStatus;
import io.datahubproject.openapi.v1.models.TraceWriteStatus;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -62,7 +63,14 @@ public abstract class KafkaTraceReader<T extends RecordTemplate> {
private final Cache<TopicPartition, OffsetAndMetadata> offsetCache =
Caffeine.newBuilder()
.maximumSize(100) // unlikely to have more than 100 partitions
.expireAfterWrite(Duration.ofMinutes(5)) // Shorter expiry for offsets
.expireAfterWrite(
Duration.ofMinutes(5)) // Short expiry since end offsets change frequently
.build();
private final Cache<TopicPartition, Long> endOffsetCache =
Caffeine.newBuilder()
.maximumSize(100) // Match the size of offsetCache
.expireAfterWrite(
Duration.ofSeconds(5)) // Short expiry since end offsets change frequently
.build();

public KafkaTraceReader(
Expand Down Expand Up @@ -218,6 +226,225 @@ public Map<Urn, Map<String, TraceStorageStatus>> tracePendingStatuses(
}
}

/**
* Returns the current consumer group offsets for all partitions of the topic.
*
* @param skipCache Whether to skip the cache when fetching offsets
* @return Map of TopicPartition to OffsetAndMetadata, empty map if no offsets found or error
* occurs
*/
public Map<TopicPartition, OffsetAndMetadata> getAllPartitionOffsets(boolean skipCache) {
final String consumerGroupId = getConsumerGroupId();
if (consumerGroupId == null) {
log.warn("Cannot get partition offsets: consumer group ID is null");
return Collections.emptyMap();
}

try {
// Get all topic partitions first
Map<String, TopicDescription> topicInfo =
adminClient
.describeTopics(Collections.singletonList(getTopicName()))
.all()
.get(timeoutSeconds, TimeUnit.SECONDS);

if (topicInfo == null || !topicInfo.containsKey(getTopicName())) {
log.error("Failed to get topic information for topic: {}", getTopicName());
return Collections.emptyMap();
}

// Create a list of all TopicPartitions
List<TopicPartition> allPartitions =
topicInfo.get(getTopicName()).partitions().stream()
.map(partitionInfo -> new TopicPartition(getTopicName(), partitionInfo.partition()))
.collect(Collectors.toList());

// For each partition that exists in the cache and wasn't requested to skip,
// pre-populate the result map
Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
if (!skipCache) {
for (TopicPartition partition : allPartitions) {
OffsetAndMetadata cached = offsetCache.getIfPresent(partition);
if (cached != null) {
result.put(partition, cached);
}
}
}

// If we have all partitions from cache and aren't skipping cache, return early
if (!skipCache && result.size() == allPartitions.size()) {
return result;
}

// Get all offsets for the consumer group
ListConsumerGroupOffsetsResult offsetsResult =
adminClient.listConsumerGroupOffsets(consumerGroupId);
if (offsetsResult == null) {
log.error("Failed to get consumer group offsets for group: {}", consumerGroupId);
return result;
}

Map<TopicPartition, OffsetAndMetadata> fetchedOffsets =
offsetsResult.partitionsToOffsetAndMetadata().get(timeoutSeconds, TimeUnit.SECONDS);

if (fetchedOffsets == null) {
log.error("Null offsets returned for consumer group: {}", consumerGroupId);
return result;
}

// Filter to only keep offsets for our topic
Map<TopicPartition, OffsetAndMetadata> topicOffsets =
fetchedOffsets.entrySet().stream()
.filter(entry -> entry.getKey().topic().equals(getTopicName()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

// Update the cache for each offset
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicOffsets.entrySet()) {
offsetCache.put(entry.getKey(), entry.getValue());
}

// Return all offsets
return topicOffsets;
} catch (Exception e) {
log.error("Error fetching all partition offsets for topic {}", getTopicName(), e);
return Collections.emptyMap();
}
}

/**
* Returns the end offsets (latest offsets) for all partitions of the topic.
*
* @param skipCache Whether to skip the cache when fetching end offsets
* @return Map of TopicPartition to end offset, empty map if no offsets found or error occurs
*/
public Map<TopicPartition, Long> getEndOffsets(boolean skipCache) {
try {
// Get all topic partitions first (reuse the same approach as in getAllPartitionOffsets)
Map<String, TopicDescription> topicInfo =
adminClient
.describeTopics(Collections.singletonList(getTopicName()))
.all()
.get(timeoutSeconds, TimeUnit.SECONDS);

if (topicInfo == null || !topicInfo.containsKey(getTopicName())) {
log.error("Failed to get topic information for topic: {}", getTopicName());
return Collections.emptyMap();
}

// Create a list of all TopicPartitions
List<TopicPartition> allPartitions =
topicInfo.get(getTopicName()).partitions().stream()
.map(partitionInfo -> new TopicPartition(getTopicName(), partitionInfo.partition()))
.collect(Collectors.toList());

// Pre-populate result map from cache if not skipping cache
Map<TopicPartition, Long> result = new HashMap<>();
if (!skipCache) {
for (TopicPartition partition : allPartitions) {
Long cached = endOffsetCache.getIfPresent(partition);
if (cached != null) {
result.put(partition, cached);
}
}

// If we have all partitions from cache and aren't skipping cache, return early
if (result.size() == allPartitions.size()) {
return result;
}
} else {
// If skipping cache, invalidate all entries for these partitions
for (TopicPartition partition : allPartitions) {
endOffsetCache.invalidate(partition);
}
}

// Fetch missing end offsets using a consumer
try (Consumer<String, GenericRecord> consumer = consumerSupplier.get()) {
// Determine which partitions we need to fetch
List<TopicPartition> partitionsToFetch =
allPartitions.stream()
.filter(partition -> skipCache || !result.containsKey(partition))
.collect(Collectors.toList());

if (!partitionsToFetch.isEmpty()) {
// Assign partitions to the consumer
consumer.assign(partitionsToFetch);

// Fetch end offsets for all partitions at once
Map<TopicPartition, Long> fetchedEndOffsets = consumer.endOffsets(partitionsToFetch);

// Update the cache and result map
for (Map.Entry<TopicPartition, Long> entry : fetchedEndOffsets.entrySet()) {
endOffsetCache.put(entry.getKey(), entry.getValue());
result.put(entry.getKey(), entry.getValue());
}
}
}

return result;
} catch (Exception e) {
log.error("Error fetching end offsets for topic {}", getTopicName(), e);
return Collections.emptyMap();
}
}

/**
* Returns the end offsets for a specific set of partitions.
*
* @param partitions Collection of TopicPartitions to get end offsets for
* @param skipCache Whether to skip the cache when fetching end offsets
* @return Map of TopicPartition to end offset
*/
public Map<TopicPartition, Long> getEndOffsets(
Collection<TopicPartition> partitions, boolean skipCache) {
if (partitions == null || partitions.isEmpty()) {
return Collections.emptyMap();
}

Map<TopicPartition, Long> result = new HashMap<>();
List<TopicPartition> partitionsToFetch = new ArrayList<>();

// Check cache first if not skipping
if (!skipCache) {
for (TopicPartition partition : partitions) {
Long cached = endOffsetCache.getIfPresent(partition);
if (cached != null) {
result.put(partition, cached);
} else {
partitionsToFetch.add(partition);
}
}

// If all partitions were cached, return early
if (partitionsToFetch.isEmpty()) {
return result;
}
} else {
// If skipping cache, fetch all partitions
partitionsToFetch.addAll(partitions);
// Invalidate cache entries
for (TopicPartition partition : partitions) {
endOffsetCache.invalidate(partition);
}
}

// Fetch end offsets for partitions not in cache
try (Consumer<String, GenericRecord> consumer = consumerSupplier.get()) {
consumer.assign(partitionsToFetch);
Map<TopicPartition, Long> fetchedOffsets = consumer.endOffsets(partitionsToFetch);

// Update cache and results
for (Map.Entry<TopicPartition, Long> entry : fetchedOffsets.entrySet()) {
endOffsetCache.put(entry.getKey(), entry.getValue());
result.put(entry.getKey(), entry.getValue());
}
} catch (Exception e) {
log.error("Error fetching end offsets for specific partitions", e);
}

return result;
}

private Map<String, TraceStorageStatus> tracePendingStatuses(
Urn urn,
Collection<String> aspectNames,
Expand Down
Loading

0 comments on commit 3dac08d

Please sign in to comment.