Skip to content

Commit ab3c4ab

Browse files
committed
Expand streaming aggregations to numeric terms
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
1 parent efecb34 commit ab3c4ab

14 files changed

+758
-59
lines changed

server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import java.io.IOException;
5454
import java.util.AbstractList;
5555
import java.util.ArrayList;
56-
import java.util.Arrays;
5756
import java.util.Iterator;
5857
import java.util.List;
5958
import java.util.Map;
@@ -260,23 +259,6 @@ public int size() {
260259
return result;
261260
}
262261

263-
/**
264-
* Build the sub aggregation results for a list of buckets and set them on
265-
* the buckets. This is usually used by aggregations that are selective
266-
* in which bucket they build. They use some mechanism of selecting a list
267-
* of buckets to build use this method to "finish" building the results.
268-
* @param buckets the buckets to finish building
269-
* @param bucketToOrd how to convert a bucket into an ordinal
270-
* @param setAggs how to set the sub-aggregation results on a bucket
271-
*/
272-
protected final <B> void buildSubAggsForBuckets(B[] buckets, ToLongFunction<B> bucketToOrd, BiConsumer<B, InternalAggregations> setAggs)
273-
throws IOException {
274-
InternalAggregations[] results = buildSubAggsForBuckets(Arrays.stream(buckets).mapToLong(bucketToOrd).toArray());
275-
for (int i = 0; i < buckets.length; i++) {
276-
setAggs.accept(buckets[i], results[i]);
277-
}
278-
}
279-
280262
/**
281263
* Build the sub aggregation results for a list of buckets and set them on
282264
* the buckets. This is usually used by aggregations that are selective

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@
5151
*
5252
* @opensearch.internal
5353
*/
54-
abstract class AbstractStringTermsAggregator extends TermsAggregator {
54+
public abstract class AbstractStringTermsAggregator extends TermsAggregator {
5555

5656
protected final boolean showTermDocCountError;
5757

58-
AbstractStringTermsAggregator(
58+
protected AbstractStringTermsAggregator(
5959
String name,
6060
AggregatorFactories factories,
6161
SearchContext context,

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/DoubleTerms.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,10 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
5858
*
5959
* @opensearch.internal
6060
*/
61-
static class Bucket extends InternalTerms.Bucket<Bucket> {
62-
double term;
61+
public static class Bucket extends InternalTerms.Bucket<Bucket> {
62+
public double term;
6363

64-
Bucket(
64+
public Bucket(
6565
double term,
6666
long docCount,
6767
InternalAggregations aggregations,

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {
409409
*
410410
* @opensearch.internal
411411
*/
412-
static class OrdBucket extends InternalTerms.Bucket<OrdBucket> {
412+
public static class OrdBucket extends InternalTerms.Bucket<OrdBucket> {
413413
long globalOrd;
414414

415415
OrdBucket(boolean showDocCountError, DocValueFormat format) {

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,13 @@ public interface Reader<B extends Bucket<B>> {
8282
B read(StreamInput in, long subsetSize, long supersetSize, DocValueFormat format) throws IOException;
8383
}
8484

85-
long subsetDf;
85+
public long subsetDf;
8686
long subsetSize;
87-
long supersetDf;
87+
public long supersetDf;
8888
long supersetSize;
89-
long bucketOrd;
89+
public long bucketOrd;
9090
double score;
91-
protected InternalAggregations aggregations;
91+
public InternalAggregations aggregations;
9292
final transient DocValueFormat format;
9393

9494
protected Bucket(
@@ -139,7 +139,7 @@ public long getSubsetSize() {
139139
// TODO we should refactor to remove this, since buckets should be immutable after they are generated.
140140
// This can lead to confusing bugs if the bucket is re-created (via createBucket() or similar) without
141141
// the score
142-
void updateScore(SignificanceHeuristic significanceHeuristic) {
142+
public void updateScore(SignificanceHeuristic significanceHeuristic) {
143143
score = significanceHeuristic.getScore(subsetDf, subsetSize, supersetDf, supersetSize);
144144
}
145145

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,11 @@ public interface Reader<B extends Bucket<B>> {
104104
B read(StreamInput in, DocValueFormat format, boolean showDocCountError) throws IOException;
105105
}
106106

107-
long bucketOrd;
107+
public long bucketOrd;
108108

109-
protected long docCount;
109+
public long docCount;
110110
protected long docCountError;
111-
protected InternalAggregations aggregations;
111+
public InternalAggregations aggregations;
112112
protected final boolean showDocCountError;
113113
protected final DocValueFormat format;
114114

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/LongTerms.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
5959
* @opensearch.internal
6060
*/
6161
public static class Bucket extends InternalTerms.Bucket<Bucket> {
62-
long term;
62+
public long term;
6363

6464
public Bucket(
6565
long term,

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificanceLookup.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,18 @@
6363
*
6464
* @opensearch.internal
6565
*/
66-
class SignificanceLookup {
66+
public class SignificanceLookup {
6767
/**
6868
* Lookup frequencies for {@link BytesRef} terms.
6969
*/
70-
interface BackgroundFrequencyForBytes extends Releasable {
70+
public interface BackgroundFrequencyForBytes extends Releasable {
7171
long freq(BytesRef term) throws IOException;
7272
}
7373

7474
/**
7575
* Lookup frequencies for {@code long} terms.
7676
*/
77-
interface BackgroundFrequencyForLong extends Releasable {
77+
public interface BackgroundFrequencyForLong extends Releasable {
7878
long freq(long term) throws IOException;
7979
}
8080

@@ -103,7 +103,7 @@ interface BackgroundFrequencyForLong extends Releasable {
103103
/**
104104
* Get the number of docs in the superset.
105105
*/
106-
long supersetSize() {
106+
public long supersetSize() {
107107
return supersetNumDocs;
108108
}
109109

@@ -155,7 +155,7 @@ private long getBackgroundFrequency(BytesRef term) throws IOException {
155155
/**
156156
* Get the background frequency of a {@code long} term.
157157
*/
158-
BackgroundFrequencyForLong longLookup(BigArrays bigArrays, CardinalityUpperBound cardinality) {
158+
public BackgroundFrequencyForLong longLookup(BigArrays bigArrays, CardinalityUpperBound cardinality) {
159159
if (cardinality == CardinalityUpperBound.ONE) {
160160
return new BackgroundFrequencyForLong() {
161161
@Override

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/SignificantLongTerms.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ public class SignificantLongTerms extends InternalMappedSignificantTerms<Signifi
5656
*
5757
* @opensearch.internal
5858
*/
59-
static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
59+
public static class Bucket extends InternalSignificantTerms.Bucket<Bucket> {
6060

61-
long term;
61+
public long term;
6262

63-
Bucket(
63+
public Bucket(
6464
long subsetDf,
6565
long subsetSize,
6666
long supersetDf,

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import org.opensearch.search.aggregations.bucket.BucketUtils;
5151
import org.opensearch.search.aggregations.bucket.terms.NumericTermsAggregator.ResultStrategy;
5252
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator.BucketCountThresholds;
53+
import org.opensearch.search.aggregations.bucket.terms.stream.StreamNumericTermsAggregator;
54+
import org.opensearch.search.aggregations.bucket.terms.stream.StreamStringTermsAggregator;
5355
import org.opensearch.search.aggregations.support.CoreValuesSourceType;
5456
import org.opensearch.search.aggregations.support.ValuesSource;
5557
import org.opensearch.search.aggregations.support.ValuesSourceAggregatorFactory;
@@ -121,7 +123,7 @@ public Aggregator build(
121123
// if user doesn't provide execution mode, and using stream search
122124
// we use stream aggregation
123125
if (context.isStreamSearch()) {
124-
return createStreamAggregator(
126+
return createStreamStringTermsAggregator(
125127
name,
126128
factories,
127129
valuesSource,
@@ -207,7 +209,6 @@ public Aggregator build(
207209
+ "include/exclude clauses used to filter numeric fields"
208210
);
209211
}
210-
211212
if (subAggCollectMode == null) {
212213
subAggCollectMode = pickSubAggCollectMode(factories, bucketCountThresholds.getShardSize(), -1, context);
213214
}
@@ -231,6 +232,23 @@ public Aggregator build(
231232
}
232233
resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError);
233234
}
235+
if (context.isStreamSearch()) {
236+
return createStreamNumericTermsAggregator(
237+
name,
238+
factories,
239+
numericValuesSource,
240+
format,
241+
order,
242+
bucketCountThresholds,
243+
context,
244+
parent,
245+
longFilter,
246+
includeExclude,
247+
showTermDocCountError,
248+
cardinality,
249+
metadata
250+
);
251+
}
234252
return new NumericTermsAggregator(
235253
name,
236254
factories,
@@ -578,7 +596,7 @@ public String toString() {
578596
}
579597
}
580598

581-
static Aggregator createStreamAggregator(
599+
static Aggregator createStreamStringTermsAggregator(
582600
String name,
583601
AggregatorFactories factories,
584602
ValuesSource valuesSource,
@@ -610,6 +628,55 @@ static Aggregator createStreamAggregator(
610628
}
611629
}
612630

631+
static Aggregator createStreamNumericTermsAggregator(
632+
String name,
633+
AggregatorFactories factories,
634+
ValuesSource.Numeric valuesSource,
635+
DocValueFormat format,
636+
BucketOrder order,
637+
BucketCountThresholds bucketCountThresholds,
638+
SearchContext aggregationContext,
639+
Aggregator parent,
640+
IncludeExclude.LongFilter longFilter,
641+
IncludeExclude includeExclude,
642+
boolean showTermDocCountError,
643+
CardinalityUpperBound cardinality,
644+
Map<String, Object> metadata
645+
) throws IOException {
646+
Function<StreamNumericTermsAggregator, StreamNumericTermsAggregator.ResultStrategy<?, ?>> resultStrategy;
647+
if (valuesSource.isFloatingPoint()) {
648+
if (includeExclude != null) {
649+
longFilter = includeExclude.convertToDoubleFilter();
650+
}
651+
resultStrategy = agg -> agg.new DoubleTermsResults(showTermDocCountError);
652+
} else if (valuesSource.isBigInteger()) {
653+
if (includeExclude != null) {
654+
longFilter = includeExclude.convertToDoubleFilter();
655+
}
656+
resultStrategy = agg -> agg.new UnsignedLongTermsResults(showTermDocCountError);
657+
} else {
658+
if (includeExclude != null) {
659+
longFilter = includeExclude.convertToLongFilter(format);
660+
}
661+
resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError);
662+
}
663+
return new StreamNumericTermsAggregator(
664+
name,
665+
factories,
666+
resultStrategy,
667+
valuesSource,
668+
format,
669+
order,
670+
bucketCountThresholds,
671+
aggregationContext,
672+
parent,
673+
SubAggCollectionMode.DEPTH_FIRST,
674+
longFilter,
675+
cardinality,
676+
metadata
677+
);
678+
}
679+
613680
@Override
614681
protected boolean supportsConcurrentSegmentSearch() {
615682
return true;

0 commit comments

Comments
 (0)