Skip to content

Commit

Permalink
* search: loading from json into search request
Browse files Browse the repository at this point in the history
Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
  • Loading branch information
neowu committed Aug 9, 2024
1 parent 81a0af2 commit e1f6377
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 32 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
## Change log

### 9.1.2 (8/9/2024 - )

* search: loading from json into search request
> for complex aggregation, refer to ElasticSearchAggregationIntegrationTest.java for usage
### 9.1.1 (7/11/2024 - 8/7/2024)

* ws/sse: updated max process time
Expand All @@ -17,7 +22,6 @@ try (EventSource source = client.sse(request)) {
```

* monitor: fix stats type for mongo 7

* uuid: added uuid v7, can be used as db friendly primary key

### 9.1.0 (6/12/2024 - 7/9/2024)
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ apply(plugin = "project")

subprojects {
group = "core.framework"
version = "9.1.1"
version = "9.1.2"
}

val elasticVersion = "8.14.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
* @author neo
*/
public class TestSearchConfig extends SearchConfig {
static {
// org.apache.lucene.store.MemorySegmentIndexInputProvider <init>
// INFO: Using MemorySegmentIndexInput with Java 21 or later; to disable start with -Dorg.apache.lucene.store.MMapDirectory.enableMemorySegments=false
System.setProperty("org.apache.lucene.store.MMapDirectory.enableMemorySegments", "false");
}

private static final ReentrantLock LOCK = new ReentrantLock();

// only start one local node for testing to reduce resource overhead,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package core.framework.search;

import core.framework.search.impl.TestAggregationDocument;
import core.framework.search.impl.TestDocument;
import core.framework.search.module.InitSearchConfig;
import core.framework.search.module.SearchConfig;
Expand All @@ -18,11 +19,13 @@ protected void initialize() {
search.timeout(Duration.ofSeconds(5));
search.maxResultWindow(1000);
search.type(TestDocument.class);
search.type(TestAggregationDocument.class);

InitSearchConfig initSearch = config(InitSearchConfig.class);
initSearch.putIndex("document", "search-test/document-index.json");
initSearch.putIndexTemplate("document", "search-test/document-index-template.json");
initSearch.flush("document");
initSearch.putIndex("aggregation_document", "search-test/aggregation-document-index.json");

// test multiple search in one app
search = config(SearchConfig.class, "other");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package core.framework.search.impl;

import co.elastic.clients.elasticsearch._types.aggregations.DateHistogramBucket;
import co.elastic.clients.elasticsearch._types.aggregations.StringTermsBucket;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import core.framework.inject.Inject;
import core.framework.search.DeleteByQueryRequest;
import core.framework.search.ElasticSearch;
import core.framework.search.ElasticSearchType;
import core.framework.search.IntegrationTest;
import core.framework.search.SearchRequest;
import core.framework.search.SearchResponse;
import core.framework.util.ClasspathResources;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.time.ZonedDateTime;
import java.util.List;

import static core.framework.search.query.Aggregations.sum;
import static org.assertj.core.api.Assertions.assertThat;

/**
* @author neo
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ElasticSearchAggregationIntegrationTest extends IntegrationTest {
@Inject
ElasticSearch elasticSearch;
@Inject
ElasticSearchType<TestAggregationDocument> documentType;

@BeforeAll
void initialize() {
documentType.index("1", document(ZonedDateTime.parse("2024-08-09T00:00:00Z"), "a1", "b1", 1));
documentType.index("2", document(ZonedDateTime.parse("2024-08-09T00:00:00Z"), "a2", "b2", 2));
documentType.index("3", document(ZonedDateTime.parse("2024-08-10T00:00:00Z"), "a1", "b2", 3));
documentType.index("4", document(ZonedDateTime.parse("2024-08-10T00:00:00Z"), "a2", "b2", 4));
documentType.index("5", document(ZonedDateTime.parse("2024-08-11T00:00:00Z"), "a1", "b1", 5));
documentType.index("6", document(ZonedDateTime.parse("2024-08-11T00:00:00Z"), "a1", "b2", 6));
elasticSearch.refreshIndex("aggregation_document");
}

@AfterAll
void cleanup() {
var request = new DeleteByQueryRequest();
request.query = new Query.Builder().matchAll(b -> b).build();
request.refresh = true;
documentType.deleteByQuery(request);
}

@Test
void aggregate() {
var request = new SearchRequest();
request.limit = 1;
request.aggregations.put("total_value", sum("value"));
SearchResponse<TestAggregationDocument> response = documentType.search(request);

assertThat(response.totalHits).isEqualTo(6);
assertThat(response.hits).hasSize(1);
assertThat(response.aggregations).containsKeys("total_value");

int sum = (int) response.aggregations.get("total_value").sum().value();
assertThat(sum).isEqualTo(21);
}

@Test
void subAggregate() {
var request = new SearchRequest();
request.withJSON(ClasspathResources.text("search-test/sub-aggregation.json"));
request.limit = 0;

SearchResponse<TestAggregationDocument> response = documentType.search(request);
List<DateHistogramBucket> dates = response.aggregations.get("date").dateHistogram().buckets().array();
assertThat(dates.getFirst().keyAsString())
.isEqualTo("2024-08-09T00:00:00.000Z");
assertThat(dates.getFirst().aggregations().get("key_1").sterms().buckets().array().getFirst().key().stringValue())
.isEqualTo("a2");
assertThat(dates.getFirst().aggregations().get("key_1").sterms().buckets().array().getFirst().aggregations().get("total_value").sum().value())
.isEqualTo(2);
assertThat(dates.getFirst().aggregations().get("key_1").sterms().buckets().array().get(1).key().stringValue())
.isEqualTo("a1");
assertThat(dates.getFirst().aggregations().get("key_1").sterms().buckets().array().get(1).aggregations().get("total_value").sum().value())
.isEqualTo(1);

assertThat(dates.get(2).keyAsString())
.isEqualTo("2024-08-11T00:00:00.000Z");
assertThat(dates.get(2).aggregations().get("key_1").sterms().buckets().array().getFirst().key().stringValue())
.isEqualTo("a1");
assertThat(dates.get(2).aggregations().get("key_1").sterms().buckets().array().getFirst().aggregations().get("total_value").sum().value())
.isEqualTo(11);
}

@Test
void subAggregateWithRuntimeField() {
var request = new SearchRequest();
request.withJSON(ClasspathResources.text("search-test/sub-aggregation-with-runtime-field.json"));
request.limit = 0;

SearchResponse<TestAggregationDocument> response = documentType.search(request);
List<StringTermsBucket> buckets = response.aggregations.get("composited_key").sterms().buckets().array();
assertThat(buckets.getFirst().key().stringValue())
.isEqualTo("a1|b2");
assertThat(buckets.getFirst().aggregations().get("total_value").sum().value())
.isEqualTo(9);
}

private TestAggregationDocument document(ZonedDateTime date, String key1, String key2, int value) {
var document = new TestAggregationDocument();
document.date = date;
document.key1 = key1;
document.key2 = key2;
document.value = value;
return document;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.time.chrono.ChronoZonedDateTime;
Expand All @@ -34,7 +32,6 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static core.framework.search.query.Aggregations.sum;
import static core.framework.search.query.Queries.match;
import static core.framework.search.query.Queries.range;
import static core.framework.search.query.Queries.term;
Expand All @@ -54,10 +51,10 @@ class ElasticSearchIntegrationTest extends IntegrationTest {

@AfterEach
void cleanup() {
var request = new BulkDeleteRequest();
request.ids = range(0, 100).mapToObj(String::valueOf).toList();
request.refresh = Boolean.TRUE;
documentType.bulkDelete(request);
var request = new DeleteByQueryRequest();
request.query = new Query.Builder().matchAll(b -> b).build();
request.refresh = true;
documentType.deleteByQuery(request);
}

@Test
Expand Down Expand Up @@ -255,28 +252,6 @@ void partialUpdate() {
assertThat(updated).isFalse();
}

@Test
void aggregate() {
documentType.index("1", document("1", "value1", 0, 19.13, null, null));
documentType.index("2", document("2", "value1", 0, 0.01, null, null));
documentType.index("3", document("3", "value3", 0, 1.5, null, null));
elasticSearch.refreshIndex("document");

var request = new SearchRequest();
request.skip = 0;
request.limit = 1;
request.query = new Query.Builder().match(match("string_field", "value1")).build();
request.aggregations.put("totalValue", sum("double_field"));
SearchResponse<TestDocument> response = documentType.search(request);

assertThat(response.totalHits).isEqualTo(2);
assertThat(response.hits).hasSize(1);
assertThat(response.aggregations).containsKeys("totalValue");

var sum = BigDecimal.valueOf(response.aggregations.get("totalValue").sum().value()).setScale(4, RoundingMode.HALF_UP);
assertThat(sum).isEqualTo("19.1400");
}

@Test
void trackTotalHits() {
Map<String, TestDocument> documents = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package core.framework.search.impl;

import core.framework.api.json.Property;
import core.framework.api.validate.NotNull;
import core.framework.search.Index;

import java.time.ZonedDateTime;

/**
* @author neo
*/
@Index(name = "aggregation_document")
public class TestAggregationDocument {
@Property(name = "date")
public ZonedDateTime date;

@Property(name = "key_1")
public String key1;

@Property(name = "key_2")
public String key2;

@NotNull
@Property(name = "value")
public Integer value;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"mappings": {
"properties": {
"date": {
"type": "date",
"format": "strict_date_optional_time"
},
"key_1": {
"type": "keyword"
},
"key_2": {
"type": "keyword"
},
"value": {
"type": "integer"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"runtime_mappings": {
"composited_key": {
"type": "keyword",
"script": "emit(doc['key_1'].value + '|' + doc['key_2'].value)"
}
},
"aggregations": {
"composited_key": {
"terms": {
"field": "composited_key",
"order": [{"total_value": "desc"}]
},
"aggregations": {
"total_value": {
"sum": {"field": "value"}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"aggregations": {
"date": {
"date_histogram": {
"field": "date",
"calendar_interval": "day"
},
"aggregations": {
"key_1": {
"terms": {
"field": "key_1",
"order": [{"total_value": "desc"}]
},
"aggregations": {
"total_value": {
"sum": {"field": "value"}
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import co.elastic.clients.elasticsearch._types.SearchType;
import co.elastic.clients.elasticsearch._types.SortOptions;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregation;
import co.elastic.clients.elasticsearch._types.mapping.RuntimeField;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import core.framework.util.Lists;
import core.framework.util.Maps;

import javax.annotation.Nullable;
import java.io.StringReader;
import java.util.List;
import java.util.Map;

Expand All @@ -16,6 +18,7 @@
*/
public class SearchRequest {
public final Map<String, Aggregation> aggregations = Maps.newHashMap();
public final Map<String, RuntimeField> runtimeFields = Maps.newHashMap();
public final List<SortOptions> sorts = Lists.newArrayList();
@Nullable
public String index;
Expand All @@ -32,4 +35,12 @@ public class SearchRequest {
public void trackTotalHits() {
trackTotalHitsUpTo = Integer.MAX_VALUE;
}

public void withJSON(String source) {
var request = co.elastic.clients.elasticsearch.core.SearchRequest.of(b -> b.withJson(new StringReader(source)));
if (request.query() != null) query = request.query();
if (request.aggregations() != null) aggregations.putAll(request.aggregations());
if (request.runtimeMappings() != null) runtimeFields.putAll(request.runtimeMappings());
if (request.sort() != null) sorts.addAll(request.sort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public SearchResponse<T> search(SearchRequest request) {
int hits = 0;
try {
var searchRequest = co.elastic.clients.elasticsearch.core.SearchRequest.of(builder -> {
builder.index(index).query(request.query).aggregations(request.aggregations).sort(request.sorts)
builder.index(index).query(request.query).runtimeMappings(request.runtimeFields).aggregations(request.aggregations).sort(request.sorts)
.searchType(request.type)
.from(request.skip)
.size(request.limit)
Expand Down

0 comments on commit e1f6377

Please sign in to comment.