-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Optimization in Numeric Terms Aggregation query for Large Bucket Counts #18702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
rishabhmaurya
merged 22 commits into
opensearch-project:main
from
vinaykpud:num-term-agg-opt
Aug 7, 2025
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
ee5276a
optimize num agg using quick select for topN when applicable
rishabhmaurya 9a528c0
Updated the numeric term aggregation logic to select topN
vinaykpud 14738d8
Add changelog
vinaykpud 52f4c52
Updated the algorithm selection logic
vinaykpud 9f7c12d
Updated the comment
vinaykpud e124eb1
spotlessApply
vinaykpud 13c663b
Updated tests
vinaykpud 45b50b7
Added a feature flag for the implementation
vinaykpud f8a69e7
Added profile debug information
vinaykpud d50d0fe
Merge branch 'main' into num-term-agg-opt
vinaykpud 647c60c
use priority queue method for significant terms
vinaykpud ad0de9d
removed featureflag
vinaykpud f8a56d2
Refactored the selection strategy
vinaykpud b6b59ea
Merge branch 'main' into num-term-agg-opt
vinaykpud add62cc
added description comments
vinaykpud 4b0aa1b
Added missing java docs
vinaykpud 7191cad
Fixed comments in PR
vinaykpud f9ea290
Added tests case with proper assertions
vinaykpud f1e09d7
Updated strategyName logic
vinaykpud 506c92e
Added cluster settings for selection strategy
vinaykpud adda83c
Merge branch 'main' into num-term-agg-opt
vinaykpud 68e77e1
Fixed nit pick
vinaykpud File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
238 changes: 238 additions & 0 deletions
238
...rc/main/java/org/opensearch/search/aggregations/bucket/terms/BucketSelectionStrategy.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,238 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.search.aggregations.bucket.terms; | ||
|
|
||
| import org.apache.lucene.util.ArrayUtil; | ||
| import org.apache.lucene.util.PriorityQueue; | ||
| import org.opensearch.search.aggregations.BucketOrder; | ||
| import org.opensearch.search.aggregations.InternalMultiBucketAggregation; | ||
| import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds; | ||
| import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.Arrays; | ||
| import java.util.Comparator; | ||
| import java.util.Iterator; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder; | ||
|
|
||
| /** | ||
| * Strategy for selecting top buckets from aggregation results. | ||
| * | ||
| */ | ||
| enum BucketSelectionStrategy { | ||
| PRIORITY_QUEUE { | ||
| @Override | ||
| public <B extends InternalMultiBucketAggregation.InternalBucket> SelectionResult<B> selectTopBuckets(SelectionInput<B> input) | ||
| throws IOException { | ||
| PriorityQueue<B> ordered = input.buildPriorityQueue.buildPriorityQueue(input.size); | ||
| B spare = null; | ||
| long otherDocCount = 0; | ||
|
|
||
| while (input.ordsEnum.next()) { | ||
| long docCount = input.bucketDocCountFunction.bucketDocCount(input.ordsEnum.ord()); | ||
| otherDocCount += docCount; | ||
| if (docCount < input.localBucketCountThresholds.getMinDocCount()) { | ||
| continue; | ||
| } | ||
| if (spare == null) { | ||
| spare = input.emptyBucketBuilder.get(); | ||
| } | ||
| input.bucketUpdateFunction.updateBucket(spare, input.ordsEnum, docCount); | ||
| spare = ordered.insertWithOverflow(spare); | ||
| } | ||
|
|
||
| B[] topBuckets = input.bucketArrayBuilder.buildBuckets(ordered.size()); | ||
| if (isKeyOrder(input.order)) { | ||
| for (int b = ordered.size() - 1; b >= 0; --b) { | ||
| topBuckets[b] = ordered.pop(); | ||
| otherDocCount -= topBuckets[b].getDocCount(); | ||
| } | ||
| } else { | ||
| Iterator<B> itr = ordered.iterator(); | ||
| for (int b = ordered.size() - 1; b >= 0; --b) { | ||
| topBuckets[b] = itr.next(); | ||
| otherDocCount -= topBuckets[b].getDocCount(); | ||
| } | ||
| } | ||
|
|
||
| return new SelectionResult<>(topBuckets, otherDocCount, "priority_queue"); | ||
| } | ||
| }, | ||
|
|
||
| QUICK_SELECT_OR_SELECT_ALL { | ||
| @Override | ||
| public <B extends InternalMultiBucketAggregation.InternalBucket> SelectionResult<B> selectTopBuckets(SelectionInput<B> input) | ||
| throws IOException { | ||
| B[] bucketsForOrd = input.bucketArrayBuilder.buildBuckets((int) input.bucketsInOrd); | ||
| int validBucketCount = 0; | ||
| long otherDocCount = 0; | ||
|
|
||
| // Collect all valid buckets | ||
| while (input.ordsEnum.next()) { | ||
| long docCount = input.bucketDocCountFunction.bucketDocCount(input.ordsEnum.ord()); | ||
| otherDocCount += docCount; | ||
| if (docCount < input.localBucketCountThresholds.getMinDocCount()) { | ||
| continue; | ||
| } | ||
|
|
||
| B spare = input.emptyBucketBuilder.get(); | ||
| input.bucketUpdateFunction.updateBucket(spare, input.ordsEnum, docCount); | ||
| bucketsForOrd[validBucketCount++] = spare; | ||
| } | ||
|
|
||
| B[] topBuckets; | ||
| String actualStrategy; | ||
| if (validBucketCount > input.size) { | ||
| ArrayUtil.select( | ||
| bucketsForOrd, | ||
| 0, | ||
| validBucketCount, | ||
| input.size, | ||
| (b1, b2) -> input.partiallyBuiltBucketComparator.compare((InternalTerms.Bucket<?>) b1, (InternalTerms.Bucket<?>) b2) | ||
| ); | ||
| topBuckets = Arrays.copyOf(bucketsForOrd, input.size); | ||
| for (int b = 0; b < input.size; b++) { | ||
| otherDocCount -= topBuckets[b].getDocCount(); | ||
| } | ||
| actualStrategy = "quick_select"; | ||
| } else { | ||
| // Return all buckets (no selection needed) | ||
| topBuckets = Arrays.copyOf(bucketsForOrd, validBucketCount); | ||
| otherDocCount = 0L; | ||
| actualStrategy = "select_all"; | ||
| } | ||
|
|
||
| return new SelectionResult<>(topBuckets, otherDocCount, actualStrategy); | ||
| } | ||
| }; | ||
|
|
||
| public static BucketSelectionStrategy determine( | ||
| int size, | ||
| long bucketsInOrd, | ||
| BucketOrder order, | ||
| Comparator<InternalTerms.Bucket<?>> partiallyBuiltBucketComparator, | ||
| int factor | ||
| ) { | ||
| /* | ||
| We select the strategy based on the following condition with configurable threshold factor: | ||
| case 1: size is less than 20% of bucketsInOrd: PRIORITY_QUEUE | ||
| case 2: size is greater than 20% of bucketsInOrd: QUICK_SELECT | ||
| case 3: size == bucketsInOrd : return all buckets | ||
| case 2 and 3 are encapsulated in QUICK_SELECT_OR_SELECT_ALL method. | ||
|
|
||
| Along with the above conditions, we also go with the original PRIORITY_QUEUE based approach | ||
| if isKeyOrder or its significant term aggregation. | ||
|
|
||
| if factor is 0, always use PRIORITY_QUEUE strategy (since 0 < bucketsInOrd is always true). | ||
| */ | ||
| if (((long) size * factor < bucketsInOrd) || isKeyOrder(order) || partiallyBuiltBucketComparator == null) { | ||
| return PRIORITY_QUEUE; | ||
| } else { | ||
| return QUICK_SELECT_OR_SELECT_ALL; | ||
| } | ||
| } | ||
|
|
||
| public abstract <B extends InternalMultiBucketAggregation.InternalBucket> SelectionResult<B> selectTopBuckets(SelectionInput<B> input) | ||
| throws IOException; | ||
|
|
||
| /** | ||
| * Represents the inputs for strategy execution to select buckets | ||
| */ | ||
| public static class SelectionInput<B extends InternalMultiBucketAggregation.InternalBucket> { | ||
| public final int size; | ||
| public final long bucketsInOrd; | ||
| public final BucketOrdsEnum ordsEnum; | ||
| public final Supplier<B> emptyBucketBuilder; | ||
| public final LocalBucketCountThresholds localBucketCountThresholds; | ||
| public final int ordIdx; | ||
| public final BucketOrder order; | ||
| public final PriorityQueueBuilder<B> buildPriorityQueue; | ||
| public final BucketArrayBuilder<B> bucketArrayBuilder; | ||
| public final BucketUpdateFunction<B> bucketUpdateFunction; | ||
| public final BucketDocCountFunction bucketDocCountFunction; | ||
| public final Comparator<InternalTerms.Bucket<?>> partiallyBuiltBucketComparator; | ||
|
|
||
| public SelectionInput( | ||
| int size, | ||
| long bucketsInOrd, | ||
| BucketOrdsEnum ordsEnum, | ||
| Supplier<B> emptyBucketBuilder, | ||
| LocalBucketCountThresholds localBucketCountThresholds, | ||
| int ordIdx, | ||
| BucketOrder order, | ||
| PriorityQueueBuilder<B> buildPriorityQueue, | ||
| BucketArrayBuilder<B> bucketArrayBuilder, | ||
| BucketUpdateFunction<B> bucketUpdateFunction, | ||
| BucketDocCountFunction bucketDocCountFunction, | ||
| Comparator<InternalTerms.Bucket<?>> partiallyBuiltBucketComparator | ||
| ) { | ||
| this.size = size; | ||
| this.bucketsInOrd = bucketsInOrd; | ||
| this.ordsEnum = ordsEnum; | ||
| this.emptyBucketBuilder = emptyBucketBuilder; | ||
| this.localBucketCountThresholds = localBucketCountThresholds; | ||
| this.ordIdx = ordIdx; | ||
| this.order = order; | ||
| this.buildPriorityQueue = buildPriorityQueue; | ||
| this.bucketArrayBuilder = bucketArrayBuilder; | ||
| this.bucketUpdateFunction = bucketUpdateFunction; | ||
| this.bucketDocCountFunction = bucketDocCountFunction; | ||
| this.partiallyBuiltBucketComparator = partiallyBuiltBucketComparator; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Represents the results strategy execution to select buckets | ||
| */ | ||
| public static class SelectionResult<B extends InternalMultiBucketAggregation.InternalBucket> { | ||
| public final B[] topBuckets; | ||
| public final long otherDocCount; | ||
| public final String actualStrategyUsed; | ||
|
|
||
| public SelectionResult(B[] topBuckets, long otherDocCount, String actualStrategyUsed) { | ||
| this.topBuckets = topBuckets; | ||
| this.otherDocCount = otherDocCount; | ||
| this.actualStrategyUsed = actualStrategyUsed; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Interface for bucketDocCount method | ||
| */ | ||
| @FunctionalInterface | ||
| public interface BucketDocCountFunction { | ||
| long bucketDocCount(long ord) throws IOException; | ||
| } | ||
|
|
||
| /** | ||
| * Interface for updateBucket method | ||
| */ | ||
| @FunctionalInterface | ||
| public interface BucketUpdateFunction<B> { | ||
| void updateBucket(B spare, BucketOrdsEnum ordsEnum, long docCount) throws IOException; | ||
| } | ||
|
|
||
| /** | ||
| * Interface for buildBuckets method | ||
| */ | ||
| @FunctionalInterface | ||
| public interface BucketArrayBuilder<B> { | ||
| B[] buildBuckets(int size); | ||
| } | ||
|
|
||
| /** | ||
| * Interface for buildPriorityQueue method | ||
| */ | ||
| @FunctionalInterface | ||
| public interface PriorityQueueBuilder<B> { | ||
| PriorityQueue<B> buildPriorityQueue(int size); | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.