diff --git a/docs/changelog/145089.yaml b/docs/changelog/145089.yaml new file mode 100644 index 0000000000000..51e902f89a13c --- /dev/null +++ b/docs/changelog/145089.yaml @@ -0,0 +1,5 @@ +area: Downsampling +issues: [] +pr: 145089 +summary: Collect dimensions only once per tsid when downsampling +type: enhancement diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml index 498b4e475b32b..8def8015a60d0 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml @@ -1993,8 +1993,6 @@ setup: index.blocks.write: true - do: - allowed_warnings: - - "Parameter [default_metric] is deprecated and will be removed in a future version" indices.downsample: index: test-multi-value target_index: test-downsample-multi-value diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractFieldDownsampler.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractFieldDownsampler.java index 9b0088cc4b7d2..3943b2ddd8f8d 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractFieldDownsampler.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/AbstractFieldDownsampler.java @@ -158,7 +158,6 @@ void increaseFormattedValueFields() { void increaseDimensionFields() { dimensionFields++; - formattedValueFields++; } void increaseExponentialHistogramFields() { diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsampler.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsampler.java index 3207d407292c8..78a273e881394 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsampler.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsampler.java @@ -7,12 +7,15 @@ package org.elasticsearch.xpack.downsample; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.internal.hppc.IntArrayList; import org.elasticsearch.index.fielddata.FormattedDocValues; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.flattened.FlattenedFieldMapper; import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.ArrayList; @@ -22,59 +25,73 @@ import java.util.Objects; /** - * The dimension field producer is effectively a last value field producer that performs some extra validations when assertions are enabled. - * It checks: - * - that a tsid is only collected once, and - * - that all TSIDs that are being collected for a round have the same value. - * Important note: This class assumes that field values are collected and sorted by descending order by time + * The dimension field downsampler reads the value of the last seen document per tsid, even if it's missing. Considering that dimensions + * are the same for the same tsid, it is guaranteed that all documents with a tsid will have the same value. Consequently, we do not reset + * it every time we start a new bucket but only if the tsid changes. */ -public class DimensionFieldDownsampler extends LastValueFieldDownsampler { +public final class DimensionFieldDownsampler extends AbstractFieldDownsampler { + + private final MappedFieldType fieldType; + private Object dimensionValue = null; DimensionFieldDownsampler(final String name, final MappedFieldType fieldType, final IndexFieldData fieldData) { - super(name, fieldType, fieldData); + super(name, fieldData); + this.fieldType = fieldType; + } + + @Override + public void reset() { + // We do not reset dimensions unless tsid is reset. } - void collectOnce(final Object value) { - assert isEmpty; - Objects.requireNonNull(value); - this.lastValue = value; - this.isEmpty = false; + public void tsidReset() { + isEmpty = true; + dimensionValue = null; } /** - * This is an expensive check that slows down downsampling significantly. - * Given that index is sorted by tsid as a primary key, this shouldn't really happen. + * Use {@link #collectOnce(FormattedDocValues, IntArrayList)} instead. + * throws UnsupportedOperationException */ - boolean validate(FormattedDocValues docValues, IntArrayList buffer) throws IOException { - for (int i = 0; i < buffer.size(); i++) { - int docId = buffer.get(i); + @Override + public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException { + throw new UnsupportedOperationException("This producer should be collected using the collectOnce method."); + } + + public void collectOnce(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException { + // We only ensure we collect once with an assertion because we do it for performance reasons, + // and it should be detected during development. + assert isEmpty() : "dimension downsamplers should only be called once per tsid"; + + // Only need to record one dimension value from one document, within in the same tsid-and-time-interval bucket values are the same. + if (docIdBuffer.isEmpty() == false) { + int docId = docIdBuffer.get(0); if (docValues.advanceExact(docId)) { + int docValueCount = docValues.docValueCount(); + assert docValueCount > 0; var value = retrieveDimensionValues(docValues); - assert value.equals(this.lastValue) != false - : "Dimension value changed without tsid change [" + value + "] != [" + this.lastValue + "]"; + Objects.requireNonNull(value); + this.dimensionValue = value; + this.isEmpty = false; } } + } - return true; + @Override + public FormattedDocValues getLeaf(LeafReaderContext context) { + DocValueFormat format = fieldType.docValueFormat(null, null); + return fieldData.load(context).getFormattedValues(format); } @Override - public void collect(FormattedDocValues docValues, IntArrayList docIdBuffer) throws IOException { + public void write(XContentBuilder builder) throws IOException { if (isEmpty() == false) { - assert validate(docValues, docIdBuffer); - return; + builder.field(name(), dimensionValue); } + } - for (int i = 0; i < docIdBuffer.size(); i++) { - int docId = docIdBuffer.get(i); - if (docValues.advanceExact(docId) == false) { - continue; - } - collectOnce(retrieveDimensionValues(docValues)); - // Only need to record one dimension value from one document, within in the same tsid-and-time-interval bucket values are the - // same. - return; - } + public Object dimensionValue() { + return dimensionValue; } private Object retrieveDimensionValues(FormattedDocValues docValues) throws IOException { diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java index 77c2da34ddd7d..5c07781160767 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardIndexer.java @@ -364,14 +364,13 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure) } private class TimeSeriesBucketCollector extends BucketCollector { - // Constants to reduce object allocation when we do not need documents for counter resets - private static final DimensionFieldDownsampler[] EMPTY_DIMENSIONS_FOR_COUNTER_RESETS = new DimensionFieldDownsampler[0]; private static final NumericMetricFieldDownsampler.AggregateCounter[] EMPTY_AGGREGATE_COUNTERS = new NumericMetricFieldDownsampler.AggregateCounter[0]; private final BulkProcessor2 bulkProcessor; private final DownsampleBucketBuilder downsampleBucketBuilder; private LeafDownsampleCollector currentLeafCollector; // Downsamplers grouped by the doc value input they expect, we use primitive arrays to reduce the footprint. + private final DimensionFieldDownsampler[] dimensionDownsamplers; private final LastValueFieldDownsampler[] formattedDocValuesDownsamplers; private final ExponentialHistogramFieldDownsampler[] exponentialHistogramDownsamplers; private final TDigestHistogramFieldDownsampler[] tDigestHistogramDownsamplers; @@ -386,6 +385,8 @@ private class TimeSeriesBucketCollector extends BucketCollector { TimeSeriesBucketCollector(BulkProcessor2 bulkProcessor, String[] dimensions) { this.bulkProcessor = bulkProcessor; + int dimensionFieldIndex = 0; + this.dimensionDownsamplers = new DimensionFieldDownsampler[fieldCounts.dimensionFields()]; int numericFieldIndex = 0; this.numericDownsamplers = new NumericMetricFieldDownsampler[fieldCounts.numericFields()]; int formattedValueFieldIndex = 0; @@ -398,10 +399,6 @@ private class TimeSeriesBucketCollector extends BucketCollector { this.aggregateCounterDownsamplers = fieldCounts.aggregateCounterFields() == 0 ? EMPTY_AGGREGATE_COUNTERS : new NumericMetricFieldDownsampler.AggregateCounter[fieldCounts.aggregateCounterFields()]; - int dimensionFieldIndex = 0; - DimensionFieldDownsampler[] dimensionDownsamplers = fieldCounts.aggregateCounterFields() == 0 - ? EMPTY_DIMENSIONS_FOR_COUNTER_RESETS - : new DimensionFieldDownsampler[fieldCounts.dimensionFields()]; for (AbstractFieldDownsampler fieldDownsampler : fieldDownsamplers) { switch (fieldDownsampler) { @@ -413,14 +410,13 @@ private class TimeSeriesBucketCollector extends BucketCollector { assert numericFieldIndex < numericDownsamplers.length; numericDownsamplers[numericFieldIndex++] = numericMetricDownsampler; } + case DimensionFieldDownsampler dimensionDownsampler -> { + assert dimensionFieldIndex < dimensionDownsamplers.length; + dimensionDownsamplers[dimensionFieldIndex++] = dimensionDownsampler; + } case LastValueFieldDownsampler lastValueDownsampler -> { assert formattedValueFieldIndex < formattedDocValuesDownsamplers.length; formattedDocValuesDownsamplers[formattedValueFieldIndex++] = lastValueDownsampler; - if (dimensionDownsamplers.length > 0 - && lastValueDownsampler instanceof DimensionFieldDownsampler dimensionFieldDownsampler) { - assert dimensionFieldIndex < dimensionDownsamplers.length; - dimensionDownsamplers[dimensionFieldIndex++] = dimensionFieldDownsampler; - } } case ExponentialHistogramFieldDownsampler exponentialHistogramDownsampler -> { assert exponentialHistogramFieldIndex < exponentialHistogramDownsamplers.length; @@ -457,6 +453,10 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag for (int i = 0; i < formattedDocValuesDownsamplers.length; i++) { formattedDocValues[i] = formattedDocValuesDownsamplers[i].getLeaf(ctx); } + var dimensionDocValues = new FormattedDocValues[dimensionDownsamplers.length]; + for (int i = 0; i < dimensionDownsamplers.length; i++) { + dimensionDocValues[i] = dimensionDownsamplers[i].getLeaf(ctx); + } var exponentialHistogramValues = new ExponentialHistogramValuesReader[exponentialHistogramDownsamplers.length]; for (int i = 0; i < exponentialHistogramDownsamplers.length; i++) { exponentialHistogramValues[i] = exponentialHistogramDownsamplers[i].getLeaf(ctx); @@ -475,6 +475,7 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag return new LeafDownsampleCollector( aggCtx, docCountProvider, + dimensionDocValues, numericValues, formattedDocValues, exponentialHistogramValues, @@ -494,6 +495,7 @@ class LeafDownsampleCollector extends LeafBucketCollector { final AggregationExecutionContext aggCtx; final DocCountProvider docCountProvider; + final FormattedDocValues[] dimensionDocValues; final SortedNumericDoubleValues[] numericValues; final FormattedDocValues[] formattedDocValues; final ExponentialHistogramValuesReader[] exponentialHistogramValues; @@ -507,6 +509,7 @@ class LeafDownsampleCollector extends LeafBucketCollector { LeafDownsampleCollector( AggregationExecutionContext aggCtx, DocCountProvider docCountProvider, + FormattedDocValues[] dimensionDocValues, SortedNumericDoubleValues[] numericValues, FormattedDocValues[] formattedDocValues, ExponentialHistogramValuesReader[] exponentialHistogramValues, @@ -516,6 +519,7 @@ class LeafDownsampleCollector extends LeafBucketCollector { ) { this.aggCtx = aggCtx; this.docCountProvider = docCountProvider; + this.dimensionDocValues = dimensionDocValues; this.numericValues = numericValues; this.formattedDocValues = formattedDocValues; this.exponentialHistogramValues = exponentialHistogramValues; @@ -592,6 +596,18 @@ void leafBulkCollection() throws IOException { collect(formattedDocValuesDownsamplers, formattedDocValues); collect(exponentialHistogramDownsamplers, exponentialHistogramValues); collect(tDigestHistogramDownsamplers, tDigestHistogramValues); + if (downsampleBucketBuilder.dimensionsCollected == false) { + assert dimensionDownsamplers.length == dimensionDocValues.length + : "Number of downsamplers [" + + dimensionDownsamplers.length + + "] does not match number of doc values [" + + dimensionDocValues.length + + "]"; + for (int i = 0; i < dimensionDownsamplers.length; i++) { + dimensionDownsamplers[i].collectOnce(dimensionDocValues[i], docIdBuffer); + } + downsampleBucketBuilder.dimensionsCollected = true; + } if (aggregateCounterDownsamplers.length > 0) { assert timestampValues != null; long[] timestamps = TimestampValueFetcher.fetch(timestampValues, docIdBuffer); @@ -694,6 +710,7 @@ private class DownsampleBucketBuilder { private long timestamp; private int docCount; private CounterResetDataPoints counterResetDataPoints; + private boolean dimensionsCollected = false; // A list of all the downsamplers so we can reset them before moving on to the next bucket private final List> fieldDownsamplers; // An array of field serializers, each field has one serializer which can group one or more AbstractFieldDownsamplers @@ -739,6 +756,11 @@ public void resetTsid(BytesRef tsid, int tsidOrd, long timestamp) { for (int i = 0; i < aggregateCounterDownsamplers.length; i++) { aggregateCounterDownsamplers[i].tsidReset(); } + // Reset dimension downsamplers + for (int i = 0; i < dimensionDownsamplers.length; i++) { + dimensionDownsamplers[i].tsidReset(); + } + dimensionsCollected = false; } /** diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsamplerTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsamplerTests.java index 68722ea265f6d..f3b60df025225 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsamplerTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DimensionFieldDownsamplerTests.java @@ -25,46 +25,54 @@ public class DimensionFieldDownsamplerTests extends ESTestCase { public void testKeywordDimension() throws IOException { DimensionFieldDownsampler dimensionDownsampler = new DimensionFieldDownsampler(randomAlphanumericOfLength(10), null, null); - assertThat(dimensionDownsampler.lastValue(), nullValue()); + assertThat(dimensionDownsampler.dimensionValue(), nullValue()); var docIdBuffer = IntArrayList.from(0, 1, 2); var values = createValuesInstance(docIdBuffer, new String[] { "aaa", "aaa", "aaa" }); - dimensionDownsampler.collect(values, docIdBuffer); - assertThat(dimensionDownsampler.lastValue(), equalTo("aaa")); + dimensionDownsampler.collectOnce(values, docIdBuffer); + assertThat(dimensionDownsampler.dimensionValue(), equalTo("aaa")); dimensionDownsampler.reset(); - assertThat(dimensionDownsampler.lastValue(), nullValue()); + assertThat(dimensionDownsampler.dimensionValue(), equalTo("aaa")); + dimensionDownsampler.tsidReset(); + assertThat(dimensionDownsampler.dimensionValue(), nullValue()); } public void testDoubleDimension() throws IOException { DimensionFieldDownsampler dimensionDownsampler = new DimensionFieldDownsampler(randomAlphanumericOfLength(10), null, null); - assertThat(dimensionDownsampler.lastValue(), nullValue()); + assertThat(dimensionDownsampler.dimensionValue(), nullValue()); var docIdBuffer = IntArrayList.from(0, 1, 2); var values = createValuesInstance(docIdBuffer, new Double[] { 10.20D, 10.20D, 10.20D }); - dimensionDownsampler.collect(values, docIdBuffer); - assertThat(dimensionDownsampler.lastValue(), equalTo(10.20D)); + dimensionDownsampler.collectOnce(values, docIdBuffer); + assertThat(dimensionDownsampler.dimensionValue(), equalTo(10.20D)); dimensionDownsampler.reset(); - assertThat(dimensionDownsampler.lastValue(), nullValue()); + assertThat(dimensionDownsampler.dimensionValue(), equalTo(10.20D)); + dimensionDownsampler.tsidReset(); + assertThat(dimensionDownsampler.dimensionValue(), nullValue()); } public void testIntegerDimension() throws IOException { DimensionFieldDownsampler dimensionDownsampler = new DimensionFieldDownsampler(randomAlphanumericOfLength(10), null, null); - assertThat(dimensionDownsampler.lastValue(), nullValue()); + assertThat(dimensionDownsampler.dimensionValue(), nullValue()); var docIdBuffer = IntArrayList.from(0, 1, 2); var values = createValuesInstance(docIdBuffer, new Integer[] { 10, 10, 10 }); - dimensionDownsampler.collect(values, docIdBuffer); - assertThat(dimensionDownsampler.lastValue(), equalTo(10)); + dimensionDownsampler.collectOnce(values, docIdBuffer); + assertThat(dimensionDownsampler.dimensionValue(), equalTo(10)); dimensionDownsampler.reset(); - assertThat(dimensionDownsampler.lastValue(), nullValue()); + assertThat(dimensionDownsampler.dimensionValue(), equalTo(10)); + dimensionDownsampler.tsidReset(); + assertThat(dimensionDownsampler.dimensionValue(), nullValue()); } public void testBooleanDimension() throws IOException { DimensionFieldDownsampler dimensionDownsampler = new DimensionFieldDownsampler(randomAlphanumericOfLength(10), null, null); - assertThat(dimensionDownsampler.lastValue(), nullValue()); + assertThat(dimensionDownsampler.dimensionValue(), nullValue()); var docIdBuffer = IntArrayList.from(0, 1, 2); var values = createValuesInstance(docIdBuffer, new Boolean[] { true, true, true }); - dimensionDownsampler.collect(values, docIdBuffer); - assertThat(dimensionDownsampler.lastValue(), equalTo(true)); + dimensionDownsampler.collectOnce(values, docIdBuffer); + assertThat(dimensionDownsampler.dimensionValue(), equalTo(true)); dimensionDownsampler.reset(); - assertThat(dimensionDownsampler.lastValue(), nullValue()); + assertThat(dimensionDownsampler.dimensionValue(), equalTo(true)); + dimensionDownsampler.tsidReset(); + assertThat(dimensionDownsampler.dimensionValue(), nullValue()); } public void testMultiValueDimensions() throws IOException { @@ -92,9 +100,27 @@ public Object nextValue() { values.iterator = Arrays.stream(multiValue).iterator(); DimensionFieldDownsampler multiLastValueProducer = new DimensionFieldDownsampler(randomAlphanumericOfLength(10), null, null); - assertThat(multiLastValueProducer.lastValue(), nullValue()); - multiLastValueProducer.collect(values, docIdBuffer); - assertThat(multiLastValueProducer.lastValue(), instanceOf(Object[].class)); - assertThat((Object[]) multiLastValueProducer.lastValue(), arrayContainingInAnyOrder(true, false)); + assertThat(multiLastValueProducer.dimensionValue(), nullValue()); + multiLastValueProducer.collectOnce(values, docIdBuffer); + assertThat(multiLastValueProducer.dimensionValue(), instanceOf(Object[].class)); + assertThat((Object[]) multiLastValueProducer.dimensionValue(), arrayContainingInAnyOrder(true, false)); } + + public void testCollectIsNotSupported() { + DimensionFieldDownsampler dimensionDownsampler = new DimensionFieldDownsampler(randomAlphanumericOfLength(10), null, null); + assertThat(dimensionDownsampler.dimensionValue(), nullValue()); + var docIdBuffer = IntArrayList.from(0, 1, 2); + var values = createValuesInstance(docIdBuffer, new String[] { "aaa", "aaa", "aaa" }); + expectThrows(UnsupportedOperationException.class, () -> dimensionDownsampler.collect(values, docIdBuffer)); + } + + public void testTwiceIsDiscouraged() throws IOException { + DimensionFieldDownsampler dimensionDownsampler = new DimensionFieldDownsampler(randomAlphanumericOfLength(10), null, null); + assertThat(dimensionDownsampler.dimensionValue(), nullValue()); + var docIdBuffer = IntArrayList.from(0, 1, 2); + var values = createValuesInstance(docIdBuffer, new String[] { "aaa", "aaa", "aaa" }); + dimensionDownsampler.collectOnce(values, docIdBuffer); + expectThrows(AssertionError.class, () -> dimensionDownsampler.collectOnce(values, docIdBuffer)); + } + }