Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(operations): ES and Kafka Operations Endpoints #12756

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Getter;
Expand Down Expand Up @@ -401,6 +403,21 @@ public Optional<SearchResponse> raw(
return esSearchDAO.raw(opContext, indexName, jsonQuery);
}

@Override
@Nonnull
public Map<Urn, Map<String, Object>> raw(
@Nonnull OperationContext opContext, @Nonnull Set<Urn> urns) {
return esSearchDAO.rawEntity(opContext, urns).entrySet().stream()
.flatMap(
entry ->
Optional.ofNullable(entry.getValue().getHits().getHits())
.filter(hits -> hits.length > 0)
.map(hits -> Map.entry(entry.getKey(), hits[0]))
.stream())
.map(entry -> Map.entry(entry.getKey(), entry.getValue().getSourceAsMap()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public int maxResultSize() {
return ESUtils.MAX_RESULT_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.aspect.patch.template.TemplateUtil.*;
import static com.linkedin.metadata.timeseries.elastic.indexbuilder.MappingsBuilder.URN_FIELD;
import static com.linkedin.metadata.utils.SearchUtil.*;

import com.datahub.util.exception.ESQueryException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.LongMap;
import com.linkedin.metadata.config.search.SearchConfiguration;
import com.linkedin.metadata.config.search.custom.CustomSearchConfiguration;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.AutoCompleteResult;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.query.filter.SortCriterion;
Expand All @@ -37,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
Expand All @@ -58,6 +62,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;

Expand Down Expand Up @@ -615,6 +620,43 @@ public Optional<SearchResponse> raw(
});
}

public Map<Urn, SearchResponse> rawEntity(@Nonnull OperationContext opContext, Set<Urn> urns) {
EntityRegistry entityRegistry = opContext.getEntityRegistry();
Map<Urn, EntitySpec> specs =
urns.stream()
.flatMap(
urn ->
Optional.ofNullable(entityRegistry.getEntitySpec(urn.getEntityType()))
.map(spec -> Map.entry(urn, spec))
.stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

return specs.entrySet().stream()
.map(
entry -> {
try {
String indexName =
opContext
.getSearchContext()
.getIndexConvention()
.getIndexName(entry.getValue());

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(
QueryBuilders.termQuery(URN_FIELD, entry.getKey().toString()));

SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);

return Map.entry(
entry.getKey(), client.search(searchRequest, RequestOptions.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private boolean supportsPointInTime() {
return pointInTimeCreationEnabled
&& ELASTICSEARCH_IMPLEMENTATION_ELASTICSEARCH.equalsIgnoreCase(elasticSearchImplementation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.datahubproject.openapi.v1.models.TraceStorageStatus;
import io.datahubproject.openapi.v1.models.TraceWriteStatus;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -62,7 +63,14 @@ public abstract class KafkaTraceReader<T extends RecordTemplate> {
private final Cache<TopicPartition, OffsetAndMetadata> offsetCache =
Caffeine.newBuilder()
.maximumSize(100) // unlikely to have more than 100 partitions
.expireAfterWrite(Duration.ofMinutes(5)) // Shorter expiry for offsets
.expireAfterWrite(
Duration.ofMinutes(5)) // Short expiry since end offsets change frequently
.build();
private final Cache<TopicPartition, Long> endOffsetCache =
Caffeine.newBuilder()
.maximumSize(100) // Match the size of offsetCache
.expireAfterWrite(
Duration.ofSeconds(5)) // Short expiry since end offsets change frequently
.build();

public KafkaTraceReader(
Expand Down Expand Up @@ -218,6 +226,225 @@ public Map<Urn, Map<String, TraceStorageStatus>> tracePendingStatuses(
}
}

/**
* Returns the current consumer group offsets for all partitions of the topic.
*
* @param skipCache Whether to skip the cache when fetching offsets
* @return Map of TopicPartition to OffsetAndMetadata, empty map if no offsets found or error
* occurs
*/
public Map<TopicPartition, OffsetAndMetadata> getAllPartitionOffsets(boolean skipCache) {
final String consumerGroupId = getConsumerGroupId();
if (consumerGroupId == null) {
log.warn("Cannot get partition offsets: consumer group ID is null");
return Collections.emptyMap();
}

try {
// Get all topic partitions first
Map<String, TopicDescription> topicInfo =
adminClient
.describeTopics(Collections.singletonList(getTopicName()))
.all()
.get(timeoutSeconds, TimeUnit.SECONDS);

if (topicInfo == null || !topicInfo.containsKey(getTopicName())) {
log.error("Failed to get topic information for topic: {}", getTopicName());
return Collections.emptyMap();
}

// Create a list of all TopicPartitions
List<TopicPartition> allPartitions =
topicInfo.get(getTopicName()).partitions().stream()
.map(partitionInfo -> new TopicPartition(getTopicName(), partitionInfo.partition()))
.collect(Collectors.toList());

// For each partition that exists in the cache and wasn't requested to skip,
// pre-populate the result map
Map<TopicPartition, OffsetAndMetadata> result = new HashMap<>();
if (!skipCache) {
for (TopicPartition partition : allPartitions) {
OffsetAndMetadata cached = offsetCache.getIfPresent(partition);
if (cached != null) {
result.put(partition, cached);
}
}
}

// If we have all partitions from cache and aren't skipping cache, return early
if (!skipCache && result.size() == allPartitions.size()) {
return result;
}

// Get all offsets for the consumer group
ListConsumerGroupOffsetsResult offsetsResult =
adminClient.listConsumerGroupOffsets(consumerGroupId);
if (offsetsResult == null) {
log.error("Failed to get consumer group offsets for group: {}", consumerGroupId);
return result;
}

Map<TopicPartition, OffsetAndMetadata> fetchedOffsets =
offsetsResult.partitionsToOffsetAndMetadata().get(timeoutSeconds, TimeUnit.SECONDS);

if (fetchedOffsets == null) {
log.error("Null offsets returned for consumer group: {}", consumerGroupId);
return result;
}

// Filter to only keep offsets for our topic
Map<TopicPartition, OffsetAndMetadata> topicOffsets =
fetchedOffsets.entrySet().stream()
.filter(entry -> entry.getKey().topic().equals(getTopicName()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

// Update the cache for each offset
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicOffsets.entrySet()) {
offsetCache.put(entry.getKey(), entry.getValue());
}

// Return all offsets
return topicOffsets;
} catch (Exception e) {
log.error("Error fetching all partition offsets for topic {}", getTopicName(), e);
return Collections.emptyMap();
}
}

/**
* Returns the end offsets (latest offsets) for all partitions of the topic.
*
* @param skipCache Whether to skip the cache when fetching end offsets
* @return Map of TopicPartition to end offset, empty map if no offsets found or error occurs
*/
public Map<TopicPartition, Long> getEndOffsets(boolean skipCache) {
try {
// Get all topic partitions first (reuse the same approach as in getAllPartitionOffsets)
Map<String, TopicDescription> topicInfo =
adminClient
.describeTopics(Collections.singletonList(getTopicName()))
.all()
.get(timeoutSeconds, TimeUnit.SECONDS);

if (topicInfo == null || !topicInfo.containsKey(getTopicName())) {
log.error("Failed to get topic information for topic: {}", getTopicName());
return Collections.emptyMap();
}

// Create a list of all TopicPartitions
List<TopicPartition> allPartitions =
topicInfo.get(getTopicName()).partitions().stream()
.map(partitionInfo -> new TopicPartition(getTopicName(), partitionInfo.partition()))
.collect(Collectors.toList());

// Pre-populate result map from cache if not skipping cache
Map<TopicPartition, Long> result = new HashMap<>();
if (!skipCache) {
for (TopicPartition partition : allPartitions) {
Long cached = endOffsetCache.getIfPresent(partition);
if (cached != null) {
result.put(partition, cached);
}
}

// If we have all partitions from cache and aren't skipping cache, return early
if (result.size() == allPartitions.size()) {
return result;
}
} else {
// If skipping cache, invalidate all entries for these partitions
for (TopicPartition partition : allPartitions) {
endOffsetCache.invalidate(partition);
}
}

// Fetch missing end offsets using a consumer
try (Consumer<String, GenericRecord> consumer = consumerSupplier.get()) {
// Determine which partitions we need to fetch
List<TopicPartition> partitionsToFetch =
allPartitions.stream()
.filter(partition -> skipCache || !result.containsKey(partition))
.collect(Collectors.toList());

if (!partitionsToFetch.isEmpty()) {
// Assign partitions to the consumer
consumer.assign(partitionsToFetch);

// Fetch end offsets for all partitions at once
Map<TopicPartition, Long> fetchedEndOffsets = consumer.endOffsets(partitionsToFetch);

// Update the cache and result map
for (Map.Entry<TopicPartition, Long> entry : fetchedEndOffsets.entrySet()) {
endOffsetCache.put(entry.getKey(), entry.getValue());
result.put(entry.getKey(), entry.getValue());
}
}
}

return result;
} catch (Exception e) {
log.error("Error fetching end offsets for topic {}", getTopicName(), e);
return Collections.emptyMap();
}
}

/**
* Returns the end offsets for a specific set of partitions.
*
* @param partitions Collection of TopicPartitions to get end offsets for
* @param skipCache Whether to skip the cache when fetching end offsets
* @return Map of TopicPartition to end offset
*/
public Map<TopicPartition, Long> getEndOffsets(
Collection<TopicPartition> partitions, boolean skipCache) {
if (partitions == null || partitions.isEmpty()) {
return Collections.emptyMap();
}

Map<TopicPartition, Long> result = new HashMap<>();
List<TopicPartition> partitionsToFetch = new ArrayList<>();

// Check cache first if not skipping
if (!skipCache) {
for (TopicPartition partition : partitions) {
Long cached = endOffsetCache.getIfPresent(partition);
if (cached != null) {
result.put(partition, cached);
} else {
partitionsToFetch.add(partition);
}
}

// If all partitions were cached, return early
if (partitionsToFetch.isEmpty()) {
return result;
}
} else {
// If skipping cache, fetch all partitions
partitionsToFetch.addAll(partitions);
// Invalidate cache entries
for (TopicPartition partition : partitions) {
endOffsetCache.invalidate(partition);
}
}

// Fetch end offsets for partitions not in cache
try (Consumer<String, GenericRecord> consumer = consumerSupplier.get()) {
consumer.assign(partitionsToFetch);
Map<TopicPartition, Long> fetchedOffsets = consumer.endOffsets(partitionsToFetch);

// Update cache and results
for (Map.Entry<TopicPartition, Long> entry : fetchedOffsets.entrySet()) {
endOffsetCache.put(entry.getKey(), entry.getValue());
result.put(entry.getKey(), entry.getValue());
}
} catch (Exception e) {
log.error("Error fetching end offsets for specific partitions", e);
}

return result;
}

private Map<String, TraceStorageStatus> tracePendingStatuses(
Urn urn,
Collection<String> aspectNames,
Expand Down
Loading
Loading