Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/145089.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: Downsampling
issues: []
pr: 145089
summary: Collect dimensions only once per tsid when downsampling
type: enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -1993,8 +1993,6 @@ setup:
index.blocks.write: true

- do:
allowed_warnings:
- "Parameter [default_metric] is deprecated and will be removed in a future version"
indices.downsample:
index: test-multi-value
target_index: test-downsample-multi-value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ void increaseFormattedValueFields() {

void increaseDimensionFields() {
dimensionFields++;
formattedValueFields++;
}

void increaseExponentialHistogramFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

package org.elasticsearch.xpack.downsample;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.internal.hppc.IntArrayList;
import org.elasticsearch.index.fielddata.FormattedDocValues;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.flattened.FlattenedFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -22,59 +25,73 @@
import java.util.Objects;

/**
* The dimension field producer is effectively a last value field producer that performs some extra validations when assertions are enabled.
* It checks:
* - that a tsid is only collected once, and
* - that all TSIDs that are being collected for a round have the same value.
* Important note: This class assumes that field values are collected and sorted by descending order by time
* The dimension field downsampler reads the value of the last seen document per tsid, even if it's missing. Considering that dimensions
* are the same for the same tsid, it is guaranteed that all documents with a tsid will have the same value. Consequently, we do not reset
* it every time we start a new bucket but only if the tsid changes.
*/
public class DimensionFieldDownsampler extends LastValueFieldDownsampler {
public final class DimensionFieldDownsampler extends AbstractFieldDownsampler<FormattedDocValues> {

private final MappedFieldType fieldType;
private Object dimensionValue = null;

DimensionFieldDownsampler(final String name, final MappedFieldType fieldType, final IndexFieldData<?> fieldData) {
super(name, fieldType, fieldData);
super(name, fieldData);
this.fieldType = fieldType;
}

@Override
public void reset() {
// We do not reset dimensions unless tsid is reset.
}

void collectOnce(final Object value) {
assert isEmpty;
Objects.requireNonNull(value);
this.lastValue = value;
this.isEmpty = false;
public void tsidReset() {
isEmpty = true;
dimensionValue = null;
}

/**
* This is an expensive check that slows down downsampling significantly.
* Given that index is sorted by tsid as a primary key, this shouldn't really happen.
* Use {@link #collectOnce(FormattedDocValues, IntArrayList)} instead.
* throws UnsupportedOperationException
*/
boolean validate(FormattedDocValues docValues, IntArrayList buffer) throws IOException {
for (int i = 0; i < buffer.size(); i++) {
int docId = buffer.get(i);
@Override
public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
throw new UnsupportedOperationException("This producer should be collected using the collectOnce method.");
}

public void collectOnce(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
// We only ensure we collect once with an assertion because we do it for performance reasons,
// and it should be detected during development.
assert isEmpty() : "dimension downsamplers should only be called once per tsid";

// Only need to record one dimension value from one document, within in the same tsid-and-time-interval bucket values are the same.
if (docIdBuffer.isEmpty() == false) {
int docId = docIdBuffer.get(0);
if (docValues.advanceExact(docId)) {
int docValueCount = docValues.docValueCount();
assert docValueCount > 0;
var value = retrieveDimensionValues(docValues);
assert value.equals(this.lastValue) != false
: "Dimension value changed without tsid change [" + value + "] != [" + this.lastValue + "]";
Objects.requireNonNull(value);
this.dimensionValue = value;
this.isEmpty = false;
}
}
}

return true;
@Override
public FormattedDocValues getLeaf(LeafReaderContext context) {
DocValueFormat format = fieldType.docValueFormat(null, null);
return fieldData.load(context).getFormattedValues(format);
}

@Override
public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException {
public void write(XContentBuilder builder) throws IOException {
if (isEmpty() == false) {
assert validate(docValues, docIdBuffer);
return;
builder.field(name(), dimensionValue);
}
}

for (int i = 0; i < docIdBuffer.size(); i++) {
int docId = docIdBuffer.get(i);
if (docValues.advanceExact(docId) == false) {
continue;
}
collectOnce(retrieveDimensionValues(docValues));
// Only need to record one dimension value from one document, within in the same tsid-and-time-interval bucket values are the
// same.
return;
}
public Object dimensionValue() {
return dimensionValue;
}

private Object retrieveDimensionValues(FormattedDocValues docValues) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,13 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
}

private class TimeSeriesBucketCollector extends BucketCollector {
// Constants to reduce object allocation when we do not need documents for counter resets
private static final DimensionFieldDownsampler[] EMPTY_DIMENSIONS_FOR_COUNTER_RESETS = new DimensionFieldDownsampler[0];
private static final NumericMetricFieldDownsampler.AggregateCounter[] EMPTY_AGGREGATE_COUNTERS =
new NumericMetricFieldDownsampler.AggregateCounter[0];
private final BulkProcessor2 bulkProcessor;
private final DownsampleBucketBuilder downsampleBucketBuilder;
private LeafDownsampleCollector currentLeafCollector;
// Downsamplers grouped by the doc value input they expect, we use primitive arrays to reduce the footprint.
private final DimensionFieldDownsampler[] dimensionDownsamplers;
private final LastValueFieldDownsampler[] formattedDocValuesDownsamplers;
private final ExponentialHistogramFieldDownsampler[] exponentialHistogramDownsamplers;
private final TDigestHistogramFieldDownsampler[] tDigestHistogramDownsamplers;
Expand All @@ -386,6 +385,8 @@ private class TimeSeriesBucketCollector extends BucketCollector {

TimeSeriesBucketCollector(BulkProcessor2 bulkProcessor, String[] dimensions) {
this.bulkProcessor = bulkProcessor;
int dimensionFieldIndex = 0;
this.dimensionDownsamplers = new DimensionFieldDownsampler[fieldCounts.dimensionFields()];
int numericFieldIndex = 0;
this.numericDownsamplers = new NumericMetricFieldDownsampler[fieldCounts.numericFields()];
int formattedValueFieldIndex = 0;
Expand All @@ -398,10 +399,6 @@ private class TimeSeriesBucketCollector extends BucketCollector {
this.aggregateCounterDownsamplers = fieldCounts.aggregateCounterFields() == 0
? EMPTY_AGGREGATE_COUNTERS
: new NumericMetricFieldDownsampler.AggregateCounter[fieldCounts.aggregateCounterFields()];
int dimensionFieldIndex = 0;
DimensionFieldDownsampler[] dimensionDownsamplers = fieldCounts.aggregateCounterFields() == 0
? EMPTY_DIMENSIONS_FOR_COUNTER_RESETS
: new DimensionFieldDownsampler[fieldCounts.dimensionFields()];

for (AbstractFieldDownsampler<?> fieldDownsampler : fieldDownsamplers) {
switch (fieldDownsampler) {
Expand All @@ -413,14 +410,13 @@ private class TimeSeriesBucketCollector extends BucketCollector {
assert numericFieldIndex < numericDownsamplers.length;
numericDownsamplers[numericFieldIndex++] = numericMetricDownsampler;
}
case DimensionFieldDownsampler dimensionDownsampler -> {
assert dimensionFieldIndex < dimensionDownsamplers.length;
dimensionDownsamplers[dimensionFieldIndex++] = dimensionDownsampler;
}
case LastValueFieldDownsampler lastValueDownsampler -> {
assert formattedValueFieldIndex < formattedDocValuesDownsamplers.length;
formattedDocValuesDownsamplers[formattedValueFieldIndex++] = lastValueDownsampler;
if (dimensionDownsamplers.length > 0
&& lastValueDownsampler instanceof DimensionFieldDownsampler dimensionFieldDownsampler) {
assert dimensionFieldIndex < dimensionDownsamplers.length;
dimensionDownsamplers[dimensionFieldIndex++] = dimensionFieldDownsampler;
}
}
case ExponentialHistogramFieldDownsampler exponentialHistogramDownsampler -> {
assert exponentialHistogramFieldIndex < exponentialHistogramDownsamplers.length;
Expand Down Expand Up @@ -457,6 +453,10 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
for (int i = 0; i < formattedDocValuesDownsamplers.length; i++) {
formattedDocValues[i] = formattedDocValuesDownsamplers[i].getLeaf(ctx);
}
var dimensionDocValues = new FormattedDocValues[dimensionDownsamplers.length];
for (int i = 0; i < dimensionDownsamplers.length; i++) {
dimensionDocValues[i] = dimensionDownsamplers[i].getLeaf(ctx);
}
var exponentialHistogramValues = new ExponentialHistogramValuesReader[exponentialHistogramDownsamplers.length];
for (int i = 0; i < exponentialHistogramDownsamplers.length; i++) {
exponentialHistogramValues[i] = exponentialHistogramDownsamplers[i].getLeaf(ctx);
Expand All @@ -475,6 +475,7 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
return new LeafDownsampleCollector(
aggCtx,
docCountProvider,
dimensionDocValues,
numericValues,
formattedDocValues,
exponentialHistogramValues,
Expand All @@ -494,6 +495,7 @@ class LeafDownsampleCollector extends LeafBucketCollector {

final AggregationExecutionContext aggCtx;
final DocCountProvider docCountProvider;
final FormattedDocValues[] dimensionDocValues;
final SortedNumericDoubleValues[] numericValues;
final FormattedDocValues[] formattedDocValues;
final ExponentialHistogramValuesReader[] exponentialHistogramValues;
Expand All @@ -507,6 +509,7 @@ class LeafDownsampleCollector extends LeafBucketCollector {
LeafDownsampleCollector(
AggregationExecutionContext aggCtx,
DocCountProvider docCountProvider,
FormattedDocValues[] dimensionDocValues,
SortedNumericDoubleValues[] numericValues,
FormattedDocValues[] formattedDocValues,
ExponentialHistogramValuesReader[] exponentialHistogramValues,
Expand All @@ -516,6 +519,7 @@ class LeafDownsampleCollector extends LeafBucketCollector {
) {
this.aggCtx = aggCtx;
this.docCountProvider = docCountProvider;
this.dimensionDocValues = dimensionDocValues;
this.numericValues = numericValues;
this.formattedDocValues = formattedDocValues;
this.exponentialHistogramValues = exponentialHistogramValues;
Expand Down Expand Up @@ -592,6 +596,18 @@ void leafBulkCollection() throws IOException {
collect(formattedDocValuesDownsamplers, formattedDocValues);
collect(exponentialHistogramDownsamplers, exponentialHistogramValues);
collect(tDigestHistogramDownsamplers, tDigestHistogramValues);
if (downsampleBucketBuilder.dimensionsCollected == false) {
assert dimensionDownsamplers.length == dimensionDocValues.length
: "Number of downsamplers ["
+ dimensionDownsamplers.length
+ "] does not match number of doc values ["
+ dimensionDocValues.length
+ "]";
for (int i = 0; i < dimensionDownsamplers.length; i++) {
dimensionDownsamplers[i].collectOnce(dimensionDocValues[i], docIdBuffer);
}
downsampleBucketBuilder.dimensionsCollected = true;
}
if (aggregateCounterDownsamplers.length > 0) {
assert timestampValues != null;
long[] timestamps = TimestampValueFetcher.fetch(timestampValues, docIdBuffer);
Expand Down Expand Up @@ -694,6 +710,7 @@ private class DownsampleBucketBuilder {
private long timestamp;
private int docCount;
private CounterResetDataPoints counterResetDataPoints;
private boolean dimensionsCollected = false;
// A list of all the downsamplers so we can reset them before moving on to the next bucket
private final List<AbstractFieldDownsampler<?>> fieldDownsamplers;
// An array of field serializers, each field has one serializer which can group one or more AbstractFieldDownsamplers
Expand Down Expand Up @@ -739,6 +756,11 @@ public void resetTsid(BytesRef tsid, int tsidOrd, long timestamp) {
for (int i = 0; i < aggregateCounterDownsamplers.length; i++) {
aggregateCounterDownsamplers[i].tsidReset();
}
// Reset dimension downsamplers
for (int i = 0; i < dimensionDownsamplers.length; i++) {
dimensionDownsamplers[i].tsidReset();
}
dimensionsCollected = false;
}

/**
Expand Down
Loading
Loading