diff --git a/CHANGELOG.md b/CHANGELOG.md index 42bfc456cf3d5..6ff8d9b930dde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add fetch phase profiling. ([#18664](https://github.com/opensearch-project/OpenSearch/pull/18664)) - Include named queries from rescore contexts in matched_queries array ([#18697](https://github.com/opensearch-project/OpenSearch/pull/18697)) - Add the configurable limit on rule cardinality ([#18663](https://github.com/opensearch-project/OpenSearch/pull/18663)) +- Optimization in Numeric Terms Aggregation query for Large Bucket Counts([#18702](https://github.com/opensearch-project/OpenSearch/pull/18702)) - Disable approximation framework when dealing with multiple sorts ([#18763](https://github.com/opensearch-project/OpenSearch/pull/18763)) - [Experimental] Start in "clusterless" mode if a clusterless ClusterPlugin is loaded ([#18479](https://github.com/opensearch-project/OpenSearch/pull/18479)) - [Star-Tree] Add star-tree search related stats ([#18707](https://github.com/opensearch-project/OpenSearch/pull/18707)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 792353908d2f2..18861b0711404 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -556,6 +556,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.MAX_KEEPALIVE_SETTING, SearchService.ALLOW_EXPENSIVE_QUERIES, MultiBucketConsumerService.MAX_BUCKET_SETTING, + SearchService.BUCKET_SELECTION_STRATEGY_FACTOR_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, SearchService.MAX_OPEN_SCROLL_CONTEXT, SearchService.MAX_OPEN_PIT_CONTEXT, diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 4688bfece3ced..dda3e203c0667 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -121,6 +121,7 @@ import java.util.function.LongSupplier; import static org.opensearch.search.SearchService.AGGREGATION_REWRITE_FILTER_SEGMENT_THRESHOLD; +import static org.opensearch.search.SearchService.BUCKET_SELECTION_STRATEGY_FACTOR_SETTING; import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; @@ -215,6 +216,7 @@ final class DefaultSearchContext extends SearchContext { private final int maxAggRewriteFilters; private final int filterRewriteSegmentThreshold; private final int cardinalityAggregationPruningThreshold; + private final int bucketSelectionStrategyFactor; private final boolean keywordIndexOrDocValuesEnabled; private final boolean isStreamSearch; @@ -280,6 +282,7 @@ final class DefaultSearchContext extends SearchContext { this.maxAggRewriteFilters = evaluateFilterRewriteSetting(); this.filterRewriteSegmentThreshold = evaluateAggRewriteFilterSegThreshold(); this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold(); + this.bucketSelectionStrategyFactor = evaluateBucketSelectionStrategyFactor(); this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories; this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled(); this.isStreamSearch = isStreamSearch; @@ -1230,6 +1233,11 @@ public int cardinalityAggregationPruningThreshold() { return cardinalityAggregationPruningThreshold; } + @Override + public int bucketSelectionStrategyFactor() { + return bucketSelectionStrategyFactor; + } + @Override public boolean keywordIndexOrDocValuesEnabled() { return keywordIndexOrDocValuesEnabled; @@ -1242,6 +1250,13 @@ private int evaluateCardinalityAggregationPruningThreshold() { return 0; } + private int evaluateBucketSelectionStrategyFactor() { + if (clusterService != null) { + return clusterService.getClusterSettings().get(BUCKET_SELECTION_STRATEGY_FACTOR_SETTING); + } + return SearchService.DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR; + } + public boolean evaluateKeywordIndexOrDocValuesEnabled() { if (clusterService != null) { return clusterService.getClusterSettings().get(KEYWORD_INDEX_OR_DOC_VALUES_ENABLED); diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index e7ef76f0a3b27..beecab73ffeab 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -369,6 +369,16 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + public static final int DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR = 5; + public static final Setting BUCKET_SELECTION_STRATEGY_FACTOR_SETTING = Setting.intSetting( + "search.aggregation.bucket_selection_strategy_factor", + DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR, + 0, + 10, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/BucketSelectionStrategy.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/BucketSelectionStrategy.java new file mode 100644 index 0000000000000..de63fb05c0a5d --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/BucketSelectionStrategy.java @@ -0,0 +1,238 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.bucket.terms; + +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.PriorityQueue; +import org.opensearch.search.aggregations.BucketOrder; +import org.opensearch.search.aggregations.InternalMultiBucketAggregation; +import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; +import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.function.Supplier; + +import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder; + +/** + * Strategy for selecting top buckets from aggregation results. + * + */ +enum BucketSelectionStrategy { + PRIORITY_QUEUE { + @Override + public SelectionResult selectTopBuckets(SelectionInput input) + throws IOException { + PriorityQueue ordered = input.buildPriorityQueue.buildPriorityQueue(input.size); + B spare = null; + long otherDocCount = 0; + + while (input.ordsEnum.next()) { + long docCount = input.bucketDocCountFunction.bucketDocCount(input.ordsEnum.ord()); + otherDocCount += docCount; + if (docCount < input.localBucketCountThresholds.getMinDocCount()) { + continue; + } + if (spare == null) { + spare = input.emptyBucketBuilder.get(); + } + input.bucketUpdateFunction.updateBucket(spare, input.ordsEnum, docCount); + spare = ordered.insertWithOverflow(spare); + } + + B[] topBuckets = input.bucketArrayBuilder.buildBuckets(ordered.size()); + if (isKeyOrder(input.order)) { + for (int b = ordered.size() - 1; b >= 0; --b) { + topBuckets[b] = ordered.pop(); + otherDocCount -= topBuckets[b].getDocCount(); + } + } else { + Iterator itr = ordered.iterator(); + for (int b = ordered.size() - 1; b >= 0; --b) { + topBuckets[b] = itr.next(); + otherDocCount -= topBuckets[b].getDocCount(); + } + } + + return new SelectionResult<>(topBuckets, otherDocCount, "priority_queue"); + } + }, + + QUICK_SELECT_OR_SELECT_ALL { + @Override + public SelectionResult selectTopBuckets(SelectionInput input) + throws IOException { + B[] bucketsForOrd = input.bucketArrayBuilder.buildBuckets((int) input.bucketsInOrd); + int validBucketCount = 0; + long otherDocCount = 0; + + // Collect all valid buckets + while (input.ordsEnum.next()) { + long docCount = input.bucketDocCountFunction.bucketDocCount(input.ordsEnum.ord()); + otherDocCount += docCount; + if (docCount < input.localBucketCountThresholds.getMinDocCount()) { + continue; + } + + B spare = input.emptyBucketBuilder.get(); + input.bucketUpdateFunction.updateBucket(spare, input.ordsEnum, docCount); + bucketsForOrd[validBucketCount++] = spare; + } + + B[] topBuckets; + String actualStrategy; + if (validBucketCount > input.size) { + ArrayUtil.select( + bucketsForOrd, + 0, + validBucketCount, + input.size, + (b1, b2) -> input.partiallyBuiltBucketComparator.compare((InternalTerms.Bucket) b1, (InternalTerms.Bucket) b2) + ); + topBuckets = Arrays.copyOf(bucketsForOrd, input.size); + for (int b = 0; b < input.size; b++) { + otherDocCount -= topBuckets[b].getDocCount(); + } + actualStrategy = "quick_select"; + } else { + // Return all buckets (no selection needed) + topBuckets = Arrays.copyOf(bucketsForOrd, validBucketCount); + otherDocCount = 0L; + actualStrategy = "select_all"; + } + + return new SelectionResult<>(topBuckets, otherDocCount, actualStrategy); + } + }; + + public static BucketSelectionStrategy determine( + int size, + long bucketsInOrd, + BucketOrder order, + Comparator> partiallyBuiltBucketComparator, + int factor + ) { + /* + We select the strategy based on the following condition with configurable threshold factor: + case 1: size is less than 20% of bucketsInOrd: PRIORITY_QUEUE + case 2: size is greater than 20% of bucketsInOrd: QUICK_SELECT + case 3: size == bucketsInOrd : return all buckets + case 2 and 3 are encapsulated in QUICK_SELECT_OR_SELECT_ALL method. + + Along with the above conditions, we also go with the original PRIORITY_QUEUE based approach + if isKeyOrder or its significant term aggregation. + + if factor is 0, always use PRIORITY_QUEUE strategy (since 0 < bucketsInOrd is always true). + */ + if (((long) size * factor < bucketsInOrd) || isKeyOrder(order) || partiallyBuiltBucketComparator == null) { + return PRIORITY_QUEUE; + } else { + return QUICK_SELECT_OR_SELECT_ALL; + } + } + + public abstract SelectionResult selectTopBuckets(SelectionInput input) + throws IOException; + + /** + * Represents the inputs for strategy execution to select buckets + */ + public static class SelectionInput { + public final int size; + public final long bucketsInOrd; + public final BucketOrdsEnum ordsEnum; + public final Supplier emptyBucketBuilder; + public final LocalBucketCountThresholds localBucketCountThresholds; + public final int ordIdx; + public final BucketOrder order; + public final PriorityQueueBuilder buildPriorityQueue; + public final BucketArrayBuilder bucketArrayBuilder; + public final BucketUpdateFunction bucketUpdateFunction; + public final BucketDocCountFunction bucketDocCountFunction; + public final Comparator> partiallyBuiltBucketComparator; + + public SelectionInput( + int size, + long bucketsInOrd, + BucketOrdsEnum ordsEnum, + Supplier emptyBucketBuilder, + LocalBucketCountThresholds localBucketCountThresholds, + int ordIdx, + BucketOrder order, + PriorityQueueBuilder buildPriorityQueue, + BucketArrayBuilder bucketArrayBuilder, + BucketUpdateFunction bucketUpdateFunction, + BucketDocCountFunction bucketDocCountFunction, + Comparator> partiallyBuiltBucketComparator + ) { + this.size = size; + this.bucketsInOrd = bucketsInOrd; + this.ordsEnum = ordsEnum; + this.emptyBucketBuilder = emptyBucketBuilder; + this.localBucketCountThresholds = localBucketCountThresholds; + this.ordIdx = ordIdx; + this.order = order; + this.buildPriorityQueue = buildPriorityQueue; + this.bucketArrayBuilder = bucketArrayBuilder; + this.bucketUpdateFunction = bucketUpdateFunction; + this.bucketDocCountFunction = bucketDocCountFunction; + this.partiallyBuiltBucketComparator = partiallyBuiltBucketComparator; + } + } + + /** + * Represents the results strategy execution to select buckets + */ + public static class SelectionResult { + public final B[] topBuckets; + public final long otherDocCount; + public final String actualStrategyUsed; + + public SelectionResult(B[] topBuckets, long otherDocCount, String actualStrategyUsed) { + this.topBuckets = topBuckets; + this.otherDocCount = otherDocCount; + this.actualStrategyUsed = actualStrategyUsed; + } + } + + /** + * Interface for bucketDocCount method + */ + @FunctionalInterface + public interface BucketDocCountFunction { + long bucketDocCount(long ord) throws IOException; + } + + /** + * Interface for updateBucket method + */ + @FunctionalInterface + public interface BucketUpdateFunction { + void updateBucket(B spare, BucketOrdsEnum ordsEnum, long docCount) throws IOException; + } + + /** + * Interface for buildBuckets method + */ + @FunctionalInterface + public interface BucketArrayBuilder { + B[] buildBuckets(int size); + } + + /** + * Interface for buildPriorityQueue method + */ + @FunctionalInterface + public interface PriorityQueueBuilder { + PriorityQueue buildPriorityQueue(int size); + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 29c55f55db881..0cc2c1940200b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -31,6 +31,7 @@ package org.opensearch.search.aggregations.bucket.terms; +import joptsimple.internal.Strings; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; @@ -73,7 +74,6 @@ import java.io.IOException; import java.math.BigInteger; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; @@ -94,6 +94,7 @@ public class NumericTermsAggregator extends TermsAggregator implements StarTreeP private final LongKeyedBucketOrds bucketOrds; private final LongFilter longFilter; private final String fieldName; + private String resultSelectionStrategy; public NumericTermsAggregator( String name, @@ -118,6 +119,7 @@ public NumericTermsAggregator( this.fieldName = (this.valuesSource instanceof ValuesSource.Numeric.FieldData) ? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName() : null; + this.resultSelectionStrategy = Strings.EMPTY; } @Override @@ -239,6 +241,11 @@ public void collectDebugInfo(BiConsumer add) { super.collectDebugInfo(add); add.accept("result_strategy", resultStrategy.describe()); add.accept("total_buckets", bucketOrds.size()); + add.accept("result_selection_strategy", resultSelectionStrategy); + } + + public String getResultSelectionStrategy() { + return resultSelectionStrategy; } /** @@ -255,42 +262,43 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws checkCancelled(); collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx]); long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - int size = (int) Math.min(bucketsInOrd, localBucketCountThresholds.getRequiredSize()); - PriorityQueue ordered = buildPriorityQueue(size); - B spare = null; BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); Supplier emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts[ordIdx] += docCount; - if (docCount < localBucketCountThresholds.getMinDocCount()) { - continue; - } - if (spare == null) { - spare = emptyBucketBuilder.get(); - } - updateBucket(spare, ordsEnum, docCount); - spare = ordered.insertWithOverflow(spare); - } - // Get the top buckets - B[] bucketsForOrd = buildBuckets(ordered.size()); - topBucketsPerOrd[ordIdx] = bucketsForOrd; - if (isKeyOrder(order)) { - for (int b = ordered.size() - 1; b >= 0; --b) { - topBucketsPerOrd[ordIdx][b] = ordered.pop(); - otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount(); - } - } else { - // sorted buckets not needed as they will be sorted by key in buildResult() which is different from - // order in priority queue ordered - Iterator itr = ordered.iterator(); - for (int b = ordered.size() - 1; b >= 0; --b) { - topBucketsPerOrd[ordIdx][b] = itr.next(); - otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount(); - } - } + BucketSelectionStrategy strategy = BucketSelectionStrategy.determine( + size, + bucketsInOrd, + order, + partiallyBuiltBucketComparator, + context.bucketSelectionStrategyFactor() + ); + + BucketSelectionStrategy.SelectionInput selectionInput = new BucketSelectionStrategy.SelectionInput<>( + size, + bucketsInOrd, + ordsEnum, + emptyBucketBuilder, + localBucketCountThresholds, + ordIdx, + order, + this::buildPriorityQueue, + this::buildBuckets, + (spare, ordsEnumParam, docCount) -> { + try { + updateBucket(spare, ordsEnumParam, docCount); + } catch (IOException e) { + throw new RuntimeException("Error updating bucket", e); + } + }, + NumericTermsAggregator.this::bucketDocCount, + partiallyBuiltBucketComparator + ); + + BucketSelectionStrategy.SelectionResult result = strategy.selectTopBuckets(selectionInput); + topBucketsPerOrd[ordIdx] = result.topBuckets; + otherDocCounts[ordIdx] = result.otherDocCount; + resultSelectionStrategy = result.actualStrategyUsed; } buildSubAggs(topBucketsPerOrd); diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index ff17fb1525986..4eadd8817a5c3 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -56,6 +56,7 @@ import org.opensearch.search.RescoreDocIds; import org.opensearch.search.SearchExtBuilder; import org.opensearch.search.SearchPhaseResult; +import org.opensearch.search.SearchService; import org.opensearch.search.SearchShardTarget; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.BucketCollectorProcessor; @@ -538,6 +539,10 @@ public int cardinalityAggregationPruningThreshold() { return 0; } + public int bucketSelectionStrategyFactor() { + return SearchService.DEFAULT_BUCKET_SELECTION_STRATEGY_FACTOR; + } + public boolean keywordIndexOrDocValuesEnabled() { return false; } diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregatorTests.java index 4988e7141bb9c..074676b8bfd13 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregatorTests.java @@ -36,16 +36,22 @@ import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.search.aggregations.AggregationExecutionException; import org.opensearch.search.aggregations.AggregatorTestCase; +import org.opensearch.search.aggregations.BucketOrder; +import org.opensearch.search.aggregations.LeafBucketCollector; +import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.support.ValueType; import java.io.IOException; @@ -53,7 +59,9 @@ import java.util.List; import java.util.function.Consumer; +import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.when; public class NumericTermsAggregatorTests extends AggregatorTestCase { private static final String LONG_FIELD = "long"; @@ -159,6 +167,105 @@ public void testBadIncludeExclude() throws IOException { } + public void testNumericTermAggregatorForResultSelectionStrategy() throws IOException { + List dataSet = new ArrayList<>(); + for (long i = 0; i < 100; i++) { + dataSet.add(i); + } + + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + Document document = new Document(); + for (Long value : dataSet) { + document.add(new SortedNumericDocValuesField(LONG_FIELD, value)); + document.add(new LongPoint(LONG_FIELD, value)); + indexWriter.addDocument(document); + document.clear(); + } + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + MappedFieldType longFieldType = new NumberFieldMapper.NumberFieldType(LONG_FIELD, NumberFieldMapper.NumberType.LONG); + + // Case 1: PriorityQueue selection, when buckets > size && buckets <= 5*size (size=2, buckets=100) + TermsAggregationBuilder aggregationBuilder1 = new TermsAggregationBuilder("_name").field(LONG_FIELD).size(2); + aggregationBuilder1.userValueTypeHint(ValueType.NUMERIC); + aggregationBuilder1.order(org.opensearch.search.aggregations.BucketOrder.count(false)); // count desc + NumericTermsAggregator aggregator1 = createAggregatorWithCustomizableSearchContext( + new MatchAllDocsQuery(), + aggregationBuilder1, + indexSearcher, + createIndexSettings(), + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + searchContext -> when(searchContext.bucketSelectionStrategyFactor()).thenReturn(5), + longFieldType + ); + collectDocuments(indexSearcher, aggregator1); + aggregator1.buildAggregations(new long[] { 0 }); + assertEquals("priority_queue", aggregator1.getResultSelectionStrategy()); + + // Case 2: QuickSelect selection, when buckets > size && buckets > 5*size (size=20, buckets=100) + TermsAggregationBuilder aggregationBuilder2 = new TermsAggregationBuilder("_name").field(LONG_FIELD).size(20); + aggregationBuilder2.userValueTypeHint(ValueType.NUMERIC); + aggregationBuilder2.order(org.opensearch.search.aggregations.BucketOrder.count(false)); // count desc + NumericTermsAggregator aggregator2 = createAggregatorWithCustomizableSearchContext( + new MatchAllDocsQuery(), + aggregationBuilder2, + indexSearcher, + createIndexSettings(), + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + searchContext -> when(searchContext.bucketSelectionStrategyFactor()).thenReturn(5), + longFieldType + ); + collectDocuments(indexSearcher, aggregator2); + aggregator2.buildAggregations(new long[] { 0 }); + assertEquals("quick_select", aggregator2.getResultSelectionStrategy()); + + // Case 3: Get All buckets when buckets <= size (size=110, buckets=100) + TermsAggregationBuilder aggregationBuilder3 = new TermsAggregationBuilder("_name").field(LONG_FIELD).size(110); + aggregationBuilder3.userValueTypeHint(ValueType.NUMERIC); + aggregationBuilder3.order(org.opensearch.search.aggregations.BucketOrder.count(false)); // count desc + NumericTermsAggregator aggregator3 = createAggregatorWithCustomizableSearchContext( + new MatchAllDocsQuery(), + aggregationBuilder3, + indexSearcher, + createIndexSettings(), + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + searchContext -> when(searchContext.bucketSelectionStrategyFactor()).thenReturn(5), + longFieldType + ); + collectDocuments(indexSearcher, aggregator3); + aggregator3.buildAggregations(new long[] { 0 }); + assertEquals("select_all", aggregator3.getResultSelectionStrategy()); + } + } + } + + public void testBucketSelectionStrategyFactorSetting() { + java.util.Comparator> mockComparator = (b1, b2) -> Long.compare(b2.getDocCount(), b1.getDocCount()); + + // Test with factor = 0 (should always use priority_queue) + BucketSelectionStrategy strategy0 = BucketSelectionStrategy.determine(20, 100L, BucketOrder.count(false), mockComparator, 0); + assertEquals(BucketSelectionStrategy.PRIORITY_QUEUE, strategy0); + + // default behavior + BucketSelectionStrategy strategy1 = BucketSelectionStrategy.determine(20, 100L, BucketOrder.count(false), mockComparator, 5); + assertEquals(BucketSelectionStrategy.QUICK_SELECT_OR_SELECT_ALL, strategy1); + + BucketSelectionStrategy strategy2 = BucketSelectionStrategy.determine(2, 100L, BucketOrder.count(false), mockComparator, 1); + assertEquals(BucketSelectionStrategy.PRIORITY_QUEUE, strategy2); + } + private void testSearchCase( Query query, List dataset, @@ -196,4 +303,16 @@ private void testSearchCase( } } + /** + * Helper method to collect all documents using the aggregator's leaf collector. + * This simulates the document collection phase that happens during normal aggregation. + */ + private void collectDocuments(IndexSearcher searcher, NumericTermsAggregator aggregator) throws IOException { + for (LeafReaderContext ctx : searcher.getIndexReader().leaves()) { + LeafBucketCollector leafCollector = aggregator.getLeafCollector(ctx, LeafBucketCollector.NO_OP_COLLECTOR); + for (int docId = 0; docId < ctx.reader().maxDoc(); docId++) { + leafCollector.collect(docId, 0); // collect with bucket ordinal 0 (root bucket) + } + } + } }