Skip to content

Commit

Permalink
Save memory when numeric terms agg is not top (#55873) (#56454)
Browse files Browse the repository at this point in the history
Right now all implementations of the `terms` agg allocate a new
`Aggregator` per bucket. This uses a bunch of memory. Exactly how much
isn't clear but each `Aggregator` ends up making its own objects to read
doc values which have non-trivial buffers. And it forces all of it
sub-aggregations to do the same. We allocate a new `Aggregator` per
bucket for two reasons:

1. We didn't have an appropriate data structure to track the
   sub-ordinals of each parent bucket.
2. You can only make a single call to `runDeferredCollections(long...)`
   per `Aggregator` which was the only way to delay collection of
   sub-aggregations.

This change switches the method that builds aggregation results from
building them one at a time to building all of the results for the
entire aggregator at the same time.

It also adds a fairly simplistic data structure to track the sub-ordinals
for `long`-keyed buckets.

It uses both of those to power numeric `terms` aggregations and removes
the per-bucket allocation of their `Aggregator`. This fairly
substantially reduces memory consumption of numeric `terms` aggregations
that are not the "top level", especially when those aggregations contain
many sub-aggregations. It also is a pretty big speed up, especially when
the aggregation is under a non-selective aggregation like
the `date_histogram`.

I picked numeric `terms` aggregations because those have the simplest
implementation. At least, I could kind of fit it in my head. And I
haven't fully understood the "bytes"-based terms aggregations, but I
imagine I'll be able to make similar optimizations to them in follow up
changes.
  • Loading branch information
nik9000 authored May 9, 2020
1 parent 0fb9bc5 commit 2f38aeb
Show file tree
Hide file tree
Showing 64 changed files with 1,388 additions and 516 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public ChildrenToParentAggregator(String name, AggregatorFactories factories,
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalParent(name, bucketDocCount(owningBucketOrdinal),
bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalParent(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public ParentToChildrenAggregator(String name, AggregatorFactories factories,
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOException {
return new InternalChildren(name, bucketDocCount(owningBucketOrdinal),
bucketAggregations(owningBucketOrdinal), metadata());
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(owningBucketOrds, (owningBucketOrd, subAggregationResults) ->
new InternalChildren(name, bucketDocCount(owningBucketOrd), subAggregationResults, metadata()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void execute(SearchContext context) {
for (Aggregator aggregator : context.aggregations().aggregators()) {
try {
aggregator.postCollection();
aggregations.add(aggregator.buildAggregation(0));
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
Expand All @@ -137,5 +137,4 @@ public void execute(SearchContext context) {
context.aggregations(null);
context.queryCollectors().remove(AggregationPhase.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,24 @@ public interface BucketComparator {
}

/**
* Build an aggregation for data that has been collected into {@code bucket}.
* Build the results of this aggregation.
* @param owningBucketOrds the ordinals of the buckets that we want to
* collect from this aggregation
* @return the results for each ordinal, in the same order as the array
* of ordinals
*/
public abstract InternalAggregation buildAggregation(long bucket) throws IOException;
public abstract InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException;

/**
* Build the result of this aggregation if it is at the "top level"
* of the aggregation tree. If, instead, it is a sub-aggregation of
* another aggregation then the aggregation that contains it will call
* {@link #buildAggregations(long[])}.
*/
public final InternalAggregation buildTopLevel() throws IOException {
assert parent() == null;
return buildAggregations(new long[] {0})[0];
}

/**
* Build an empty aggregation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,27 @@ public void collect(int doc, long bucket) throws IOException {
}

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
if (bucket < aggregators.size()) {
Aggregator aggregator = aggregators.get(bucket);
if (aggregator != null) {
return aggregator.buildAggregation(0);
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
if (owningBucketOrds[ordIdx] < aggregators.size()) {
Aggregator aggregator = aggregators.get(owningBucketOrds[ordIdx]);
if (aggregator != null) {
/*
* This is the same call as buildTopLevel but since
* this aggregator may not be the top level we don't
* call that method here. It'd be weird sounding. And
* it'd trip assertions. Both bad.
*/
results[ordIdx] = aggregator.buildAggregations(new long [] {0})[0];
} else {
results[ordIdx] = buildEmptyAggregation();
}
} else {
results[ordIdx] = buildEmptyAggregation();
}
}
return buildEmptyAggregation();
return results;
}

@Override
Expand Down Expand Up @@ -232,7 +245,9 @@ public AggregatorFactory getParent() {
* Utility method. Given an {@link AggregatorFactory} that creates
* {@link Aggregator}s that only know how to collect bucket {@code 0}, this
* returns an aggregator that can collect any bucket.
* @deprecated implement the aggregator to handle many owning buckets
*/
@Deprecated
protected static Aggregator asMultiBucketAggregator(final AggregatorFactory factory, final SearchContext searchContext,
final Aggregator parent) throws IOException {
final Aggregator first = factory.create(searchContext, parent, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,5 +363,4 @@ public double sortValue(AggregationPath.PathElement head, Iterator<AggregationPa
// subclasses will override this with a real implementation if you can sort on a descendant
throw new IllegalArgumentException("Can't sort by a descendant of a [" + getType() + "] aggregation [" + head + "]");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;

import java.io.IOException;
import java.util.stream.Stream;
Expand Down Expand Up @@ -73,9 +74,33 @@ public void collect(int doc, long bucket) throws IOException {
}

/**
* Collect the given doc in the given bucket.
* Collect the given {@code doc} in the bucket owned by
* {@code owningBucketOrd}.
* <p>
* The implementation of this method metric aggregations is generally
* something along the lines of
* <pre>{@code
* array[owningBucketOrd] += loadValueFromDoc(doc)
* }</pre>
* <p>Bucket aggregations have more trouble because their job is to
* <strong>make</strong> new ordinals. So their implementation generally
* looks kind of like
* <pre>{@code
* long myBucketOrd = mapOwningBucketAndValueToMyOrd(owningBucketOrd, loadValueFromDoc(doc));
* collectBucket(doc, myBucketOrd);
* }</pre>
* <p>
* Some bucket aggregations "know" how many ordinals each owning ordinal
* needs so they can map "densely". The {@code range} aggregation, for
* example, can perform this mapping with something like:
* <pre>{@code
* return rangeCount * owningBucketOrd + matchingRange(value);
* }</pre>
* Other aggregations don't know how many buckets will fall into any
* particular owning bucket. The {@code terms} aggregation, for example,
* uses {@link LongKeyedBucketOrds} which amounts to a hash lookup.
*/
public abstract void collect(int doc, long bucket) throws IOException;
public abstract void collect(int doc, long owningBucketOrd) throws IOException;

@Override
public final void collect(int doc) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
/**
* An aggregation service that creates instances of {@link MultiBucketConsumer}.
* The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
* in {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
* in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
*/
public class MultiBucketConsumerService {
Expand Down Expand Up @@ -94,7 +94,7 @@ protected void metadataToXContent(XContentBuilder builder, Params params) throws
* An {@link IntConsumer} that throws a {@link TooManyBucketsException}
* when the sum of the provided values is above the limit (`search.max_buckets`).
* It is used by aggregators to limit the number of bucket creation during
* {@link Aggregator#buildAggregation} and {@link InternalAggregation#reduce}.
* {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
*/
public static class MultiBucketConsumer implements IntConsumer {
private final int limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext reader, Leaf
}

@Override
public final InternalAggregation buildAggregation(long owningBucketOrdinal) {
return buildEmptyAggregation();
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildEmptyAggregation();
}
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,10 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
throw new IllegalStateException("Already been replayed");
}

final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long bucket : selectedBuckets) {
hash.add(bucket);
this.selectedBuckets = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long ord : selectedBuckets) {
this.selectedBuckets.add(ord);
}
this.selectedBuckets = hash;

boolean needsScores = scoreMode().needsScores();
Weight weight = null;
Expand All @@ -185,7 +184,7 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
final long rebasedBucket = this.selectedBuckets.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (scoreIt.docID() < doc) {
Expand Down Expand Up @@ -213,19 +212,20 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
public Aggregator wrap(final Aggregator in) {

return new WrappedAggregator(in) {

@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
if (selectedBuckets == null) {
throw new IllegalStateException("Collection has not been replayed yet.");
}
final long rebasedBucket = selectedBuckets.find(bucket);
if (rebasedBucket == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected");
long[] rebasedOrds = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
rebasedOrds[ordIdx] = selectedBuckets.find(owningBucketOrds[ordIdx]);
if (rebasedOrds[ordIdx] == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected");
}
}
return in.buildAggregation(rebasedBucket);
return in.buildAggregations(rebasedOrds);
}

};
}

Expand Down
Loading

0 comments on commit 2f38aeb

Please sign in to comment.