Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dynamic pruning to cardinality aggregations on low-cardinality keyword fields. #92060

Merged
merged 14 commits into from
May 1, 2023
6 changes: 6 additions & 0 deletions docs/changelog/92060.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92060
summary: Add support for dynamic pruning to cardinality aggregations on low-cardinality
keyword fields
area: Aggregations
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ public void set(long index) {
bits.set(wordNum, bits.get(wordNum) | bitmask(index));
}

/**
* Set the {@code index}th bit and return {@code true} if the bit was set already.
*/
public boolean getAndSet(long index) {
long wordNum = wordNum(index);
bits = bigArrays.grow(bits, wordNum + 1);
long word = bits.get(wordNum);
long bitMask = bitmask(index);
bits.set(wordNum, word | bitMask);
return (word & bitMask) != 0;
}

/** this = this OR other */
public void or(BitArray other) {
or(other.bits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,17 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
if (valuesSourceConfig.hasValues()) {
if (valuesSourceConfig.getValuesSource() instanceof final ValuesSource.Bytes.WithOrdinals source) {
if (executionMode.useGlobalOrdinals(context, source, precision)) {
final String field;
if (valuesSourceConfig.alignesWithSearchIndex()) {
field = valuesSourceConfig.fieldType().name();
} else {
field = null;
}
final long maxOrd = source.globalMaxOrd(context.searcher().getIndexReader());
return new GlobalOrdCardinalityAggregator(
name,
source,
field,
precision,
Math.toIntExact(maxOrd),
context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@

package org.elasticsearch.search.aggregations.metrics;

import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.hash.MurmurHash3;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
Expand All @@ -26,18 +33,35 @@
import org.elasticsearch.search.aggregations.support.ValuesSource;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;

/**
* An aggregator that computes approximate counts of unique values
* using global ords.
*/
public class GlobalOrdCardinalityAggregator extends NumericMetricsAggregator.SingleValue {

// Don't try to dynamically prune fields that have more than 1024 unique terms, there is a chance we never get to 128 unseen terms, and
// we'd be paying the overhead of dynamic pruning without getting any benefits.
private static final int MAX_FIELD_CARDINALITY_FOR_DYNAMIC_PRUNING = 1024;

// Only start dynamic pruning when 128 ordinals or less have not been seen yet.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this comment. Do we prune when we have less than 128 or more than 128 ordinals?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understand this, is that if we have less then or equal to 128 unseen ordinals then we prune.

private static final int MAX_TERMS_FOR_DYNAMIC_PRUNING = 128;

private final ValuesSource.Bytes.WithOrdinals valuesSource;
// The field that this cardinality aggregation runs on, or null if there is no field, or the field doesn't directly map to an index
// field.
private final String field;
private final BigArrays bigArrays;
private final int maxOrd;
private final int precision;
private int dynamicPruningAttempts;
private int dynamicPruningSuccess;
private int bruteForce;
private int noData;

// Build at post-collection phase
@Nullable
Expand All @@ -48,6 +72,7 @@ public class GlobalOrdCardinalityAggregator extends NumericMetricsAggregator.Sin
public GlobalOrdCardinalityAggregator(
String name,
ValuesSource.Bytes.WithOrdinals valuesSource,
String field,
int precision,
int maxOrd,
AggregationContext context,
Expand All @@ -56,6 +81,7 @@ public GlobalOrdCardinalityAggregator(
) throws IOException {
super(name, context, parent, metadata);
this.valuesSource = valuesSource;
this.field = field;
this.precision = precision;
this.maxOrd = maxOrd;
this.bigArrays = context.bigArrays();
Expand All @@ -64,12 +90,182 @@ public GlobalOrdCardinalityAggregator(

@Override
public ScoreMode scoreMode() {
return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
if (field != null && valuesSource.needsScores() == false && maxOrd <= MAX_FIELD_CARDINALITY_FOR_DYNAMIC_PRUNING) {
return ScoreMode.TOP_DOCS;
} else if (valuesSource.needsScores()) {
return ScoreMode.COMPLETE;
} else {
return ScoreMode.COMPLETE_NO_SCORES;
}
}

/**
* A competitive iterator that helps only collect values that have not been collected so far.
*/
private class CompetitiveIterator extends DocIdSetIterator {

private final BitArray visitedOrds;
private long numNonVisitedOrds;
private final TermsEnum indexTerms;
private final DocIdSetIterator docsWithField;

CompetitiveIterator(int numNonVisitedOrds, BitArray visitedOrds, Terms indexTerms, DocIdSetIterator docsWithField)
throws IOException {
this.visitedOrds = visitedOrds;
this.numNonVisitedOrds = numNonVisitedOrds;
this.indexTerms = Objects.requireNonNull(indexTerms).iterator();
this.docsWithField = docsWithField;
}

private Map<Long, PostingsEnum> nonVisitedOrds;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this optimisation is limited in its use, only if on fields with <= 1024 terms, the need to use LongObjectPagedHashMap isn't needed here.

private PriorityQueue<PostingsEnum> nonVisitedPostings;

private int doc = -1;

@Override
public int docID() {
return doc;
}

@Override
public int nextDoc() throws IOException {
return advance(doc + 1);
}

@Override
public int advance(int target) throws IOException {
if (nonVisitedPostings == null) {
// We haven't started pruning yet, iterate on docs that have a value. This may already help a lot on sparse fields.
return doc = docsWithField.advance(target);
} else if (nonVisitedPostings.size() == 0) {
return doc = DocIdSetIterator.NO_MORE_DOCS;
} else {
PostingsEnum top = nonVisitedPostings.top();
while (top.docID() < target) {
top.advance(target);
top = nonVisitedPostings.updateTop();
}
return doc = top.docID();
}
}

@Override
public long cost() {
return docsWithField.cost();
}

void startPruning() throws IOException {
dynamicPruningSuccess++;
nonVisitedOrds = new HashMap<>();
// TODO: iterate the bitset using a `nextClearBit` operation?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Letting this loop be lead by nextClearBit() is a good idea 👍
I think we can add that method to bitset and make use of that in a follow up change.

for (long ord = 0; ord < maxOrd; ++ord) {
if (visitedOrds.get(ord)) {
continue;
}
BytesRef term = values.lookupOrd(ord);
if (indexTerms.seekExact(term) == false) {
// This global ordinal maps to a value that doesn't exist in this segment
continue;
}
nonVisitedOrds.put(ord, indexTerms.postings(null, PostingsEnum.NONE));
}
nonVisitedPostings = new PriorityQueue<>(nonVisitedOrds.size()) {
@Override
protected boolean lessThan(PostingsEnum a, PostingsEnum b) {
return a.docID() < b.docID();
}
};
for (PostingsEnum pe : nonVisitedOrds.values()) {
nonVisitedPostings.add(pe);
}
}

void onVisitedOrdinal(long ordinal) throws IOException {
numNonVisitedOrds--;
if (nonVisitedOrds == null) {
if (numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) {
startPruning();
}
} else {
if (nonVisitedOrds.remove(ordinal) != null) {
// Could we make this more efficient?
nonVisitedPostings.clear();
for (PostingsEnum pe : nonVisitedOrds.values()) {
nonVisitedPostings.add(pe);
}
}
}
}
}

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException {
values = valuesSource.globalOrdinalsValues(aggCtx.getLeafReaderContext());

if (parent == null && field != null) {
// This optimization only applies to top-level cardinality aggregations that apply to fields indexed with an inverted index.
final Terms indexTerms = aggCtx.getLeafReaderContext().reader().terms(field);
if (indexTerms != null) {
BitArray bits = visitedOrds.get(0);
final int numNonVisitedOrds = maxOrd - (bits == null ? 0 : (int) bits.cardinality());
if (maxOrd <= MAX_FIELD_CARDINALITY_FOR_DYNAMIC_PRUNING || numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the optimisation also kicks in on fields with more than 1024 unique values, if there 128 or less terms to be processed. The brute force leaf bucket collector implementation does update the bit sets used here to determine numNonVisitedOrds.

dynamicPruningAttempts++;
return new LeafBucketCollector() {

final BitArray bits;
final CompetitiveIterator competitiveIterator;

{
// This optimization only works for top-level cardinality aggregations that collect bucket 0, so we can retrieve
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the only current limitation then I think we can get around this by creating CompetitiveIterator instance lazily? We would then need a CompetitiveIterator per bucket ordinal. Not sure if this can work. But could be explored in a follow up PR.

// the appropriate BitArray ahead of time.
visitedOrds = bigArrays.grow(visitedOrds, 1);
BitArray bits = visitedOrds.get(0);
if (bits == null) {
bits = new BitArray(maxOrd, bigArrays);
visitedOrds.set(0, bits);
}
this.bits = bits;
final DocIdSetIterator docsWithField = valuesSource.ordinalsValues(aggCtx.getLeafReaderContext());
competitiveIterator = new CompetitiveIterator(numNonVisitedOrds, bits, indexTerms, docsWithField);
if (numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) {
competitiveIterator.startPruning();
}
}

@Override
public void collect(int doc, long bucketOrd) throws IOException {
if (values.advanceExact(doc)) {
for (long ord = values.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = values.nextOrd()) {
if (bits.getAndSet(ord) == false) {
competitiveIterator.onVisitedOrdinal(ord);
}
}
}
}

@Override
public CompetitiveIterator competitiveIterator() {
return competitiveIterator;
}
};
}
} else {
final FieldInfo fi = aggCtx.getLeafReaderContext().reader().getFieldInfos().fieldInfo(field);
if (fi == null) {
// The field doesn't exist at all, we can skip the segment entirely
noData++;
return LeafBucketCollector.NO_OP_COLLECTOR;
} else if (fi.getIndexOptions() != IndexOptions.NONE) {
// The field doesn't have terms while index options are not NONE. This means that this segment doesn't have a single
// value for the field.
noData++;
return LeafBucketCollector.NO_OP_COLLECTOR;
}
// Otherwise we might be aggregating e.g. an IP field, which indexes data using points rather than an inverted index.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in a follow up change, we can also do the same trick here with points?

}
}

bruteForce++;
return new LeafBucketCollector() {
@Override
public void collect(int doc, long bucketOrd) throws IOException {
Expand Down Expand Up @@ -157,4 +353,13 @@ protected void doClose() {
}
Releasables.close(visitedOrds, counts);
}

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("dynamic_pruning_attempted", dynamicPruningAttempts);
add.accept("dynamic_pruning_used", dynamicPruningSuccess);
add.accept("brute_force_used", bruteForce);
add.accept("skipped_due_to_no_data", noData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,21 @@ public void testCardinality() {
}
}
}

public void testGetAndSet() {
try (BitArray bitArray = new BitArray(1, BigArrays.NON_RECYCLING_INSTANCE)) {
assertFalse(bitArray.getAndSet(100));
assertFalse(bitArray.getAndSet(1000));
assertTrue(bitArray.getAndSet(100));
assertFalse(bitArray.getAndSet(101));
assertFalse(bitArray.getAndSet(999));
assertTrue(bitArray.getAndSet(1000));
assertFalse(bitArray.get(99));
assertTrue(bitArray.get(100));
assertTrue(bitArray.get(101));
assertTrue(bitArray.get(999));
assertTrue(bitArray.get(1000));
assertFalse(bitArray.get(1001));
}
}
}
Loading