Skip to content

Commit 2f38aeb

Browse files
authored
Save memory when numeric terms agg is not top (elastic#55873) (elastic#56454)
Right now all implementations of the `terms` agg allocate a new `Aggregator` per bucket. This uses a bunch of memory. Exactly how much isn't clear but each `Aggregator` ends up making its own objects to read doc values which have non-trivial buffers. And it forces all of it sub-aggregations to do the same. We allocate a new `Aggregator` per bucket for two reasons: 1. We didn't have an appropriate data structure to track the sub-ordinals of each parent bucket. 2. You can only make a single call to `runDeferredCollections(long...)` per `Aggregator` which was the only way to delay collection of sub-aggregations. This change switches the method that builds aggregation results from building them one at a time to building all of the results for the entire aggregator at the same time. It also adds a fairly simplistic data structure to track the sub-ordinals for `long`-keyed buckets. It uses both of those to power numeric `terms` aggregations and removes the per-bucket allocation of their `Aggregator`. This fairly substantially reduces memory consumption of numeric `terms` aggregations that are not the "top level", especially when those aggregations contain many sub-aggregations. It also is a pretty big speed up, especially when the aggregation is under a non-selective aggregation like the `date_histogram`. I picked numeric `terms` aggregations because those have the simplest implementation. At least, I could kind of fit it in my head. And I haven't fully understood the "bytes"-based terms aggregations, but I imagine I'll be able to make similar optimizations to them in follow up changes.
1 parent 0fb9bc5 commit 2f38aeb

File tree

64 files changed

+1388
-516
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+1388
-516
lines changed

modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public ChildrenToParentAggregator(String name, AggregatorFactories factories,
4545
}
4646

4747
@Override
48-
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
49-
return new InternalParent(name, bucketDocCount(owningBucketOrdinal),
50-
bucketAggregations(owningBucketOrdinal), metadata());
48+
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
49+
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
50+
new InternalParent(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
5151
}
5252

5353
@Override

modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ public ParentToChildrenAggregator(String name, AggregatorFactories factories,
4141
}
4242

4343
@Override
44-
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
45-
return new InternalChildren(name, bucketDocCount(owningBucketOrdinal),
46-
bucketAggregations(owningBucketOrdinal), metadata());
44+
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
45+
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
46+
new InternalChildren(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
4747
}
4848

4949
@Override

server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void execute(SearchContext context) {
125125
for (Aggregator aggregator : context.aggregations().aggregators()) {
126126
try {
127127
aggregator.postCollection();
128-
aggregations.add(aggregator.buildAggregation(0));
128+
aggregations.add(aggregator.buildTopLevel());
129129
} catch (IOException e) {
130130
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
131131
}
@@ -137,5 +137,4 @@ public void execute(SearchContext context) {
137137
context.aggregations(null);
138138
context.queryCollectors().remove(AggregationPhase.class);
139139
}
140-
141140
}

server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,24 @@ public interface BucketComparator {
152152
}
153153

154154
/**
155-
* Build an aggregation for data that has been collected into {@code bucket}.
155+
* Build the results of this aggregation.
156+
* @param owningBucketOrds the ordinals of the buckets that we want to
157+
* collect from this aggregation
158+
* @return the results for each ordinal, in the same order as the array
159+
* of ordinals
156160
*/
157-
public abstract InternalAggregation buildAggregation(long bucket) throws IOException;
161+
public abstract InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException;
162+
163+
/**
164+
* Build the result of this aggregation if it is at the "top level"
165+
* of the aggregation tree. If, instead, it is a sub-aggregation of
166+
* another aggregation then the aggregation that contains it will call
167+
* {@link #buildAggregations(long[])}.
168+
*/
169+
public final InternalAggregation buildTopLevel() throws IOException {
170+
assert parent() == null;
171+
return buildAggregations(new long[] {0})[0];
172+
}
158173

159174
/**
160175
* Build an empty aggregation.

server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactory.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,27 @@ public void collect(int doc, long bucket) throws IOException {
146146
}
147147

148148
@Override
149-
public InternalAggregation buildAggregation(long bucket) throws IOException {
150-
if (bucket < aggregators.size()) {
151-
Aggregator aggregator = aggregators.get(bucket);
152-
if (aggregator != null) {
153-
return aggregator.buildAggregation(0);
149+
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
150+
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
151+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
152+
if (owningBucketOrds[ordIdx] < aggregators.size()) {
153+
Aggregator aggregator = aggregators.get(owningBucketOrds[ordIdx]);
154+
if (aggregator != null) {
155+
/*
156+
* This is the same call as buildTopLevel but since
157+
* this aggregator may not be the top level we don't
158+
* call that method here. It'd be weird sounding. And
159+
* it'd trip assertions. Both bad.
160+
*/
161+
results[ordIdx] = aggregator.buildAggregations(new long [] {0})[0];
162+
} else {
163+
results[ordIdx] = buildEmptyAggregation();
164+
}
165+
} else {
166+
results[ordIdx] = buildEmptyAggregation();
154167
}
155168
}
156-
return buildEmptyAggregation();
169+
return results;
157170
}
158171

159172
@Override
@@ -232,7 +245,9 @@ public AggregatorFactory getParent() {
232245
* Utility method. Given an {@link AggregatorFactory} that creates
233246
* {@link Aggregator}s that only know how to collect bucket {@code 0}, this
234247
* returns an aggregator that can collect any bucket.
248+
* @deprecated implement the aggregator to handle many owning buckets
235249
*/
250+
@Deprecated
236251
protected static Aggregator asMultiBucketAggregator(final AggregatorFactory factory, final SearchContext searchContext,
237252
final Aggregator parent) throws IOException {
238253
final Aggregator first = factory.create(searchContext, parent, true);

server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,5 +363,4 @@ public double sortValue(AggregationPath.PathElement head, Iterator<AggregationPa
363363
// subclasses will override this with a real implementation if you can sort on a descendant
364364
throw new IllegalArgumentException("Can't sort by a descendant of a [" + getType() + "] aggregation [" + head + "]");
365365
}
366-
367366
}

server/src/main/java/org/elasticsearch/search/aggregations/LeafBucketCollector.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.lucene.search.LeafCollector;
2323
import org.apache.lucene.search.Scorable;
24+
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
2425

2526
import java.io.IOException;
2627
import java.util.stream.Stream;
@@ -73,9 +74,33 @@ public void collect(int doc, long bucket) throws IOException {
7374
}
7475

7576
/**
76-
* Collect the given doc in the given bucket.
77+
* Collect the given {@code doc} in the bucket owned by
78+
* {@code owningBucketOrd}.
79+
* <p>
80+
* The implementation of this method metric aggregations is generally
81+
* something along the lines of
82+
* <pre>{@code
83+
* array[owningBucketOrd] += loadValueFromDoc(doc)
84+
* }</pre>
85+
* <p>Bucket aggregations have more trouble because their job is to
86+
* <strong>make</strong> new ordinals. So their implementation generally
87+
* looks kind of like
88+
* <pre>{@code
89+
* long myBucketOrd = mapOwningBucketAndValueToMyOrd(owningBucketOrd, loadValueFromDoc(doc));
90+
* collectBucket(doc, myBucketOrd);
91+
* }</pre>
92+
* <p>
93+
* Some bucket aggregations "know" how many ordinals each owning ordinal
94+
* needs so they can map "densely". The {@code range} aggregation, for
95+
* example, can perform this mapping with something like:
96+
* <pre>{@code
97+
* return rangeCount * owningBucketOrd + matchingRange(value);
98+
* }</pre>
99+
* Other aggregations don't know how many buckets will fall into any
100+
* particular owning bucket. The {@code terms} aggregation, for example,
101+
* uses {@link LongKeyedBucketOrds} which amounts to a hash lookup.
77102
*/
78-
public abstract void collect(int doc, long bucket) throws IOException;
103+
public abstract void collect(int doc, long owningBucketOrd) throws IOException;
79104

80105
@Override
81106
public final void collect(int doc) throws IOException {

server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
/**
3535
* An aggregation service that creates instances of {@link MultiBucketConsumer}.
3636
* The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
37-
* in {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
37+
* in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
3838
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
3939
*/
4040
public class MultiBucketConsumerService {
@@ -94,7 +94,7 @@ protected void metadataToXContent(XContentBuilder builder, Params params) throws
9494
* An {@link IntConsumer} that throws a {@link TooManyBucketsException}
9595
* when the sum of the provided values is above the limit (`search.max_buckets`).
9696
* It is used by aggregators to limit the number of bucket creation during
97-
* {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
97+
* {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
9898
*/
9999
public static class MultiBucketConsumer implements IntConsumer {
100100
private final int limit;

server/src/main/java/org/elasticsearch/search/aggregations/NonCollectingAggregator.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,11 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext reader, Leaf
4848
}
4949

5050
@Override
51-
public final InternalAggregation buildAggregation(long owningBucketOrdinal) {
52-
return buildEmptyAggregation();
51+
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
52+
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
53+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
54+
results[ordIdx] = buildEmptyAggregation();
55+
}
56+
return results;
5357
}
5458
}

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,10 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
154154
throw new IllegalStateException("Already been replayed");
155155
}
156156

157-
final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
158-
for (long bucket : selectedBuckets) {
159-
hash.add(bucket);
157+
this.selectedBuckets = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
158+
for (long ord : selectedBuckets) {
159+
this.selectedBuckets.add(ord);
160160
}
161-
this.selectedBuckets = hash;
162161

163162
boolean needsScores = scoreMode().needsScores();
164163
Weight weight = null;
@@ -185,7 +184,7 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
185184
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
186185
doc += docDeltaIterator.next();
187186
final long bucket = buckets.next();
188-
final long rebasedBucket = hash.find(bucket);
187+
final long rebasedBucket = this.selectedBuckets.find(bucket);
189188
if (rebasedBucket != -1) {
190189
if (needsScores) {
191190
if (scoreIt.docID() < doc) {
@@ -213,19 +212,20 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
213212
public Aggregator wrap(final Aggregator in) {
214213

215214
return new WrappedAggregator(in) {
216-
217215
@Override
218-
public InternalAggregation buildAggregation(long bucket) throws IOException {
216+
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
219217
if (selectedBuckets == null) {
220218
throw new IllegalStateException("Collection has not been replayed yet.");
221219
}
222-
final long rebasedBucket = selectedBuckets.find(bucket);
223-
if (rebasedBucket == -1) {
224-
throw new IllegalStateException("Cannot build for a bucket which has not been collected");
220+
long[] rebasedOrds = new long[owningBucketOrds.length];
221+
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
222+
rebasedOrds[ordIdx] = selectedBuckets.find(owningBucketOrds[ordIdx]);
223+
if (rebasedOrds[ordIdx] == -1) {
224+
throw new IllegalStateException("Cannot build for a bucket which has not been collected");
225+
}
225226
}
226-
return in.buildAggregation(rebasedBucket);
227+
return in.buildAggregations(rebasedOrds);
227228
}
228-
229229
};
230230
}
231231

0 commit comments

Comments
 (0)