Skip to content

Commit 6cc8da5

Browse files
authored
Change InternalSignificantTerms to only sum shard level counts in final reduce (#8735)
Signed-off-by: Jay Deng <[email protected]>
1 parent 27a14c7 commit 6cc8da5

File tree

10 files changed

+102
-9
lines changed

10 files changed

+102
-9
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8484
### Changed
8585
- Perform aggregation postCollection in ContextIndexSearcher after searching leaves ([#8303](https://github.com/opensearch-project/OpenSearch/pull/8303))
8686
- Make Span exporter configurable ([#8620](https://github.com/opensearch-project/OpenSearch/issues/8620))
87+
- Change InternalSignificantTerms to sum shard-level superset counts only in final reduce ([#8735](https://github.com/opensearch-project/OpenSearch/pull/8735))
8788

8889
### Deprecated
8990

server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/SignificantTermsSignificanceScoreIT.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package org.opensearch.search.aggregations.bucket;
3333

34+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
3435
import org.opensearch.action.index.IndexRequestBuilder;
3536
import org.opensearch.action.search.SearchRequestBuilder;
3637
import org.opensearch.action.search.SearchResponse;
@@ -42,6 +43,7 @@
4243
import org.opensearch.common.xcontent.XContentType;
4344
import org.opensearch.index.query.QueryBuilder;
4445
import org.opensearch.index.query.QueryBuilders;
46+
import org.opensearch.index.query.TermQueryBuilder;
4547
import org.opensearch.plugins.Plugin;
4648
import org.opensearch.plugins.SearchPlugin;
4749
import org.opensearch.script.MockScriptPlugin;
@@ -210,6 +212,34 @@ public void testXContentResponse() throws Exception {
210212

211213
}
212214

215+
public void testConsistencyWithDifferentShardCounts() throws Exception {
216+
// The purpose of this test is to validate that the aggregation results do not change with shard count.
217+
// bg_count for significant term agg is summed up across shards, so in this test we compare a 1 shard and 2 shard search request
218+
String type = randomBoolean() ? "text" : "long";
219+
String settings = "{\"index.number_of_shards\": 1, \"index.number_of_replicas\": 0}";
220+
SharedSignificantTermsTestMethods.index01Docs(type, settings, this);
221+
222+
SearchRequestBuilder request = client().prepareSearch(INDEX_NAME)
223+
.setQuery(new TermQueryBuilder(CLASS_FIELD, "0"))
224+
.addAggregation((significantTerms("sig_terms").field(TEXT_FIELD)));
225+
226+
SearchResponse response1 = request.get();
227+
228+
assertAcked(client().admin().indices().delete(new DeleteIndexRequest("*")).get());
229+
230+
settings = "{\"index.number_of_shards\": 2, \"index.number_of_replicas\": 0}";
231+
// We use a custom routing strategy here to ensure that each shard will have at least 1 bucket.
232+
// If there are no buckets collected for a shard, then that will affect the scoring and bg_count and our assertion will not be
233+
// valid.
234+
SharedSignificantTermsTestMethods.index01DocsWithRouting(type, settings, this);
235+
SearchResponse response2 = request.get();
236+
237+
assertEquals(
238+
response1.getAggregations().asMap().get("sig_terms").toString(),
239+
response2.getAggregations().asMap().get("sig_terms").toString()
240+
);
241+
}
242+
213243
public void testPopularTermManyDeletedDocs() throws Exception {
214244
String settings = "{\"index.number_of_shards\": 1, \"index.number_of_replicas\": 0}";
215245
assertAcked(

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -917,8 +917,10 @@ public ReaderContext readerContext() {
917917
}
918918

919919
@Override
920-
public InternalAggregation.ReduceContext partial() {
921-
return requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction();
920+
public InternalAggregation.ReduceContext partialOnShard() {
921+
InternalAggregation.ReduceContext rc = requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction();
922+
rc.setSliceLevel(isConcurrentSegmentSearchEnabled());
923+
return rc;
922924
}
923925

924926
@Override

server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
7070
// using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all
7171
// documents are collected across shards for an aggregation
7272
return new AggregationReduceableSearchResult(
73-
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partial())
73+
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
7474
);
7575
} else {
7676
return new AggregationReduceableSearchResult(internalAggregations);

server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ public static class ReduceContext {
8989
private final ScriptService scriptService;
9090
private final IntConsumer multiBucketConsumer;
9191
private final PipelineTree pipelineTreeRoot;
92+
93+
private boolean isSliceLevel;
9294
/**
9395
* Supplies the pipelines when the result of the reduce is serialized
9496
* to node versions that need pipeline aggregators to be serialized
@@ -138,6 +140,7 @@ private ReduceContext(
138140
this.multiBucketConsumer = multiBucketConsumer;
139141
this.pipelineTreeRoot = pipelineTreeRoot;
140142
this.pipelineTreeForBwcSerialization = pipelineTreeForBwcSerialization;
143+
this.isSliceLevel = false;
141144
}
142145

143146
/**
@@ -149,6 +152,14 @@ public boolean isFinalReduce() {
149152
return pipelineTreeRoot != null;
150153
}
151154

155+
public void setSliceLevel(boolean sliceLevel) {
156+
this.isSliceLevel = sliceLevel;
157+
}
158+
159+
public boolean isSliceLevel() {
160+
return this.isSliceLevel;
161+
}
162+
152163
public BigArrays bigArrays() {
153164
return bigArrays;
154165
}

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,13 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
232232
@SuppressWarnings("unchecked")
233233
InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
234234
globalSubsetSize += terms.getSubsetSize();
235-
globalSupersetSize += terms.getSupersetSize();
235+
// supersetSize is a shard level count, if we sum it across slices we would produce num_slices_with_bucket * supersetSize where
236+
// num_slices_with_bucket is the number of segment slices that have collected a bucket for the key
237+
if (reduceContext.isSliceLevel()) {
238+
globalSupersetSize = terms.getSupersetSize();
239+
} else {
240+
globalSupersetSize += terms.getSupersetSize();
241+
}
236242
}
237243
Map<String, List<B>> buckets = new HashMap<>();
238244
for (InternalAggregation aggregation : aggregations) {
@@ -291,7 +297,13 @@ protected B reduceBucket(List<B> buckets, ReduceContext context) {
291297
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
292298
for (B bucket : buckets) {
293299
subsetDf += bucket.subsetDf;
294-
supersetDf += bucket.supersetDf;
300+
// supersetDf is a shard level count, if we sum it across slices we would produce num_slices_with_bucket * supersetSize where
301+
// num_slices_with_bucket is the number of segment slices that have collected a bucket for the key
302+
if (context.isSliceLevel()) {
303+
supersetDf = bucket.supersetDf;
304+
} else {
305+
supersetDf += bucket.supersetDf;
306+
}
295307
aggregationsList.add(bucket.aggregations);
296308
}
297309
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);

server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -546,8 +546,8 @@ public ReaderContext readerContext() {
546546
}
547547

548548
@Override
549-
public InternalAggregation.ReduceContext partial() {
550-
return in.partial();
549+
public InternalAggregation.ReduceContext partialOnShard() {
550+
return in.partialOnShard();
551551
}
552552

553553
@Override

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ public String toString() {
465465

466466
public abstract ReaderContext readerContext();
467467

468-
public abstract InternalAggregation.ReduceContext partial();
468+
public abstract InternalAggregation.ReduceContext partialOnShard();
469469

470470
// processor used for bucket collectors
471471
public abstract void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor);

server/src/test/java/org/opensearch/test/search/aggregations/bucket/SharedSignificantTermsTestMethods.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,4 +113,41 @@ public static void index01Docs(String type, String settings, OpenSearchIntegTest
113113
indexRequestBuilderList.add(client().prepareIndex(INDEX_NAME).setId("7").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0"));
114114
testCase.indexRandom(true, false, indexRequestBuilderList);
115115
}
116+
117+
public static void index01DocsWithRouting(String type, String settings, OpenSearchIntegTestCase testCase) throws ExecutionException,
118+
InterruptedException {
119+
String textMappings = "type=" + type;
120+
if (type.equals("text")) {
121+
textMappings += ",fielddata=true";
122+
}
123+
assertAcked(
124+
testCase.prepareCreate(INDEX_NAME)
125+
.setSettings(settings, XContentType.JSON)
126+
.setMapping("text", textMappings, CLASS_FIELD, "type=keyword")
127+
);
128+
String[] gb = { "0", "1" };
129+
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
130+
indexRequestBuilderList.add(
131+
client().prepareIndex(INDEX_NAME).setId("1").setSource(TEXT_FIELD, "1", CLASS_FIELD, "1").setRouting("0")
132+
);
133+
indexRequestBuilderList.add(
134+
client().prepareIndex(INDEX_NAME).setId("2").setSource(TEXT_FIELD, "1", CLASS_FIELD, "1").setRouting("0")
135+
);
136+
indexRequestBuilderList.add(
137+
client().prepareIndex(INDEX_NAME).setId("3").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0").setRouting("0")
138+
);
139+
indexRequestBuilderList.add(
140+
client().prepareIndex(INDEX_NAME).setId("4").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0").setRouting("1")
141+
);
142+
indexRequestBuilderList.add(
143+
client().prepareIndex(INDEX_NAME).setId("5").setSource(TEXT_FIELD, gb, CLASS_FIELD, "1").setRouting("1")
144+
);
145+
indexRequestBuilderList.add(
146+
client().prepareIndex(INDEX_NAME).setId("6").setSource(TEXT_FIELD, gb, CLASS_FIELD, "0").setRouting("0")
147+
);
148+
indexRequestBuilderList.add(
149+
client().prepareIndex(INDEX_NAME).setId("7").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0").setRouting("0")
150+
);
151+
testCase.indexRandom(true, false, indexRequestBuilderList);
152+
}
116153
}

test/framework/src/main/java/org/opensearch/test/TestSearchContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ public ReaderContext readerContext() {
659659
}
660660

661661
@Override
662-
public InternalAggregation.ReduceContext partial() {
662+
public InternalAggregation.ReduceContext partialOnShard() {
663663
return InternalAggregationTestCase.emptyReduceContextBuilder().forPartialReduction();
664664
}
665665

0 commit comments

Comments
 (0)