Skip to content

Commit f0846d8

Browse files
committed
Add tests
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
1 parent ab3c4ab commit f0846d8

File tree

2 files changed

+1227
-13
lines changed

2 files changed

+1227
-13
lines changed

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/stream/StreamNumericTermsAggregator.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ public class StreamNumericTermsAggregator extends TermsAggregator {
5858
private final ResultStrategy<?, ?> resultStrategy;
5959
private final ValuesSource.Numeric valuesSource;
6060
private final IncludeExclude.LongFilter longFilter;
61-
private long valueCount;
6261
private LongKeyedBucketOrds bucketOrds;
6362
private final CardinalityUpperBound cardinality;
6463

@@ -87,21 +86,17 @@ public StreamNumericTermsAggregator(
8786
@Override
8887
public void doReset() {
8988
super.doReset();
90-
// TODO: reset bucketOrds here
91-
valueCount = 0;
89+
Releasables.close(bucketOrds);
90+
bucketOrds = null;
9291
}
9392

9493
@Override
9594
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
95+
if (bucketOrds != null) {
96+
bucketOrds.close();
97+
}
9698
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);
9799
SortedNumericDocValues values = resultStrategy.getValues(ctx);
98-
this.valueCount = values.docValueCount();
99-
if (docCounts == null) {
100-
this.docCounts = context.bigArrays().newLongArray(valueCount, true);
101-
} else {
102-
// TODO: check performance of grow vs creating a new one
103-
this.docCounts = context.bigArrays().grow(docCounts, valueCount);
104-
}
105100
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) {
106101
@Override
107102
public void collect(int doc, long owningBucketOrd) throws IOException {
@@ -115,9 +110,10 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
115110
long bucketOrdinal = bucketOrds.add(owningBucketOrd, val);
116111
if (bucketOrdinal < 0) { // already seen
117112
bucketOrdinal = -1 - bucketOrdinal;
113+
collectExistingBucket(sub, doc, bucketOrdinal);
114+
} else {
115+
collectBucket(sub, doc, bucketOrdinal);
118116
}
119-
// TODO: do we need to call #collectBucket actually?
120-
collectExistingBucket(sub, doc, bucketOrdinal);
121117
}
122118
previous = val;
123119
}
@@ -139,6 +135,13 @@ public abstract class ResultStrategy<R extends InternalAggregation, B extends In
139135
implements
140136
Releasable {
141137
private InternalAggregation[] buildAggregationsBatch(long[] owningBucketOrds) throws IOException {
138+
if (bucketOrds == null) { // no data collected
139+
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
140+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
141+
results[ordIdx] = buildEmptyResult();
142+
}
143+
return results;
144+
}
142145
LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds);
143146
B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
144147
long[] otherDocCounts = new long[owningBucketOrds.length];
@@ -643,7 +646,7 @@ public InternalAggregation buildEmptyAggregation() {
643646
public void collectDebugInfo(BiConsumer<String, Object> add) {
644647
super.collectDebugInfo(add);
645648
add.accept("result_strategy", resultStrategy.describe());
646-
add.accept("total_buckets", bucketOrds.size());
649+
add.accept("total_buckets", bucketOrds == null ? 0 : bucketOrds.size());
647650
}
648651

649652
@Override

0 commit comments

Comments
 (0)