Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/reference/aggregations/metrics/rate-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
++++

A `rate` metrics aggregation can be used only inside a `date_histogram` or `composite` aggregation. It calculates a rate of documents
or a field in each bucket. The field values can be generated extracted from specific numeric or
or a field in each bucket. The field values can be extracted from specific numeric or
<<histogram,histogram fields>> in the documents.

NOTE: For `composite` aggregations, there must be exactly one `date_histogram` source for the `rate` aggregation to be supported.
Expand All @@ -27,7 +27,7 @@ A `rate` aggregation looks like this in isolation:
--------------------------------------------------
// NOTCONSOLE

The following request will group all sales records into monthly bucket and than convert the number of sales transaction in each bucket
The following request will group all sales records into monthly buckets and then convert the number of sales transactions in each bucket
into per annual sales rate.

[source,console]
Expand Down Expand Up @@ -56,8 +56,8 @@ GET sales/_search
<1> Histogram is grouped by month.
<2> But the rate is converted into annual rate.

The response will return the annual rate of transaction in each bucket. Since there are 12 months per year, the annual rate will
be automatically calculated by multiplying monthly rate by 12.
The response will return the annual rate of transactions in each bucket. Since there are 12 months per year, the annual rate will
be automatically calculated by multiplying the monthly rate by 12.

[source,console-result]
--------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.search.aggregations.bucket.DocCountProvider;
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator;
import org.elasticsearch.search.aggregations.metrics.MinAggregator;
import org.elasticsearch.search.aggregations.metrics.SumAggregator;
Expand All @@ -38,6 +39,7 @@ public abstract class AggregatorBase extends Aggregator {

protected final String name;
protected final Aggregator parent;
protected final DocCountProvider docCountProvider;
private final AggregationContext context;
private final Map<String, Object> metadata;

Expand Down Expand Up @@ -102,6 +104,7 @@ public ScoreMode scoreMode() {
}
};
addRequestCircuitBreakerBytes(DEFAULT_WEIGHT);
docCountProvider = new DocCountProvider();
}

/**
Expand Down Expand Up @@ -220,7 +223,10 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws
* Can be overridden by aggregator implementations that like the perform an operation before the leaf collectors
* of children aggregators are instantiated for the next segment.
*/
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {}
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {
// Set LeafReaderContext to the doc_count provider
docCountProvider.setLeafReaderContext(ctx);
}

/**
* Can be overridden by aggregator implementation to be called back when the collection phase starts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package org.elasticsearch.search.aggregations.bucket;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
Expand Down Expand Up @@ -40,7 +39,6 @@
public abstract class BucketsAggregator extends AggregatorBase {
private final IntConsumer multiBucketConsumer;
private LongArray docCounts;
protected final DocCountProvider docCountProvider;

public BucketsAggregator(
String name,
Expand All @@ -53,7 +51,6 @@ public BucketsAggregator(
super(name, factories, context, parent, bucketCardinality, metadata);
multiBucketConsumer = context.multiBucketConsumer();
docCounts = bigArrays().newLongArray(1, true);
docCountProvider = new DocCountProvider();
}

/**
Expand Down Expand Up @@ -418,11 +415,4 @@ public static boolean descendsFromGlobalAggregator(Aggregator parent) {
}
return false;
}

@Override
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {
super.preGetSubLeafCollectors(ctx);
// Set LeafReaderContext to the doc_count provider
docCountProvider.setLeafReaderContext(ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AbstractRateAggregator extends NumericMetricsAggregator.Si
private final DocValueFormat format;
private final Rounding.DateTimeUnit rateUnit;
protected final RateMode rateMode;
protected final boolean computeRateOnDocs;
private final SizedBucketAggregator sizedBucketAggregator;

protected DoubleArray sums;
Expand All @@ -54,6 +55,11 @@ public AbstractRateAggregator(
}
this.rateUnit = rateUnit;
this.rateMode = rateMode;

// If no fields or scripts have been defined in the agg, rate should be computed based on bucket doc_counts
computeRateOnDocs = valuesSourceConfig.fieldContext() == null
&& valuesSourceConfig.script() == null
&& valuesSourceConfig.scriptValueType() == null;
this.sizedBucketAggregator = findSizedBucketAncestor();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,18 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays().grow(sums, bucket + 1);
compensations = bigArrays().grow(compensations, bucket + 1);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);

if (values.advanceExact(doc)) {
if (computeRateOnDocs) {
final int docCount = docCountProvider.getDocCount(doc);
kahanSummation.add(docCount);
} else if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);

switch (rateMode) {
case SUM:
for (int i = 0; i < valuesCount; i++) {
Expand All @@ -63,10 +67,10 @@ public void collect(int doc, long bucket) throws IOException {
default:
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
}

compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}

compensations.set(bucket, kahanSummation.delta());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things jump out at me here:

  1. When we're summing doc count or value count we might be better off storing all this in long instead of double. Then we wouldn't need Kahan at all.
  2. This does kahan stuff if there aren't any values for the field. I'll be most of the time folks use this on pretty dense fields. But this isn't a change I'd want to make in our (hopefully) conservative 7.16 release. Again, it probably doesn't have a performance impact in most cases, but 7.16 will live a long long time so the odds are better than normal that it'll come up.

Copy link
Contributor Author

@csoulios csoulios Oct 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good observations.

1. When we're summing doc count or value count we might be better off storing all this in long instead of double. Then we wouldn't need Kahan at all.

This can be a good optimization mostly because we save memory by using a LongArray instead of two DoubleArrays. I was just thinking that this could be a separate PR because it requires more subtle handling in methods such as metric(long) and buildAggregation(long).

2. This does kahan stuff if there aren't any values for the field. I'll be most of the time folks use this on pretty dense fields. But this isn't a change I'd want to make in our (hopefully) conservative 7.16 release. Again, it probably doesn't have a performance impact in most cases, but 7.16 will live a long long time so the odds are better than normal that it'll come up.

All "expensive" Kahan computations happen at kahan.add() method. kahan.reset(), kahan.value() and kahan.delta() are plain setters and getters. The only possible overhead I can see here are the BigArrays operations. So, I moved those in the conditionals so that they are performed only if values exist.

sums.set(bucket, kahanSummation.value());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.mapper.CustomTermFreqField;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -859,6 +860,18 @@ public void testModeWithoutField() {
assertEquals("The mode parameter is only supported with field or script", ex.getMessage());
}

public void testWithCustomDocCount() throws IOException {
testCase(new MatchAllDocsQuery(), "month", true, "month", null, iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new CustomTermFreqField("_doc_count", "_doc_count", 10)));
iw.addDocument(doc("2010-04-01T03:43:34"));
iw.addDocument(doc("2010-04-27T03:43:34", new CustomTermFreqField("_doc_count", "_doc_count", 5)));
}, dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(10.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(6.0, 0.000001));
});
}

private static AbstractAggregationBuilder<?> randomValidMultiBucketAggBuilder(
RateAggregationBuilder rateAggregationBuilder,
DateHistogramInterval interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,41 @@ setup:
- length: { aggregations.by_date.buckets: 2 }
- match: { aggregations.by_date.buckets.0.rate.value: 1.0 }
- match: { aggregations.by_date.buckets.1.rate.value: 2.0 }


---
"rate with doc_count":
- do:
bulk:
index: test2
refresh: true
body:
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:33:37.477Z", "_doc_count": 10}'
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:35:37.477Z", "_doc_count": 5}'
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:35:38.477Z", "_doc_count": 1}'
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:36:08.477Z"}'
- do:
search:
size: 0
index: "test2"
body:
aggs:
by_date:
date_histogram:
field: timestamp
fixed_interval: 60s
aggs:
rate:
rate:
unit: minute

- length: { aggregations.by_date.buckets: 4 }
- match: { aggregations.by_date.buckets.0.rate.value: 10.0 }
- match: { aggregations.by_date.buckets.1.rate.value: 0.0 }
- match: { aggregations.by_date.buckets.2.rate.value: 6.0 }
- match: { aggregations.by_date.buckets.3.rate.value: 1.0 }