Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.downsample;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.internal.hppc.IntArrayList;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.flattened.FlattenedFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateMetricDoubleFieldMapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.index.mapper.TimeSeriesParams.MetricType.POSITION;

/**
* Base class that reads fields from the source index and produces their downsampled values
*/
abstract class AbstractFieldDownsampler<T> implements DownsampleFieldSerializer {

private final String name;
protected boolean isEmpty;
protected final IndexFieldData<?> fieldData;

AbstractFieldDownsampler(String name, IndexFieldData<?> fieldData) {
this.name = name;
this.isEmpty = true;
this.fieldData = fieldData;
}

/**
* @return the name of the field.
*/
public String name() {
return name;
}

/**
* Resets the downsampler to an empty value. The downsampler should be reset before downsampling a field for a new bucket.
*/
public abstract void reset();

/**
* @return true if the field has not collected any value.
*/
public boolean isEmpty() {
return isEmpty;
}

/**
* @return the leaf reader that will retrieve the doc values for this field.
*/
public abstract T getLeaf(LeafReaderContext context) throws IOException;

/**
* Collects the values for this field of the doc ids requested.
* @param docValues the doc values for this field
* @param docIdBuffer the doc ids for which we need to retrieve the field values
* @throws IOException
*/
public abstract void collect(T docValues, IntArrayList docIdBuffer) throws IOException;

/**
* Create field downsamplers for the provided list of fields.
*/
static List<AbstractFieldDownsampler<?>> create(
SearchExecutionContext context,
String[] fields,
Map<String, String> multiFieldSources,
DownsampleConfig.SamplingMethod samplingMethod
) {
List<AbstractFieldDownsampler<?>> downsamplers = new ArrayList<>();
for (String field : fields) {
String sourceField = multiFieldSources.getOrDefault(field, field);
MappedFieldType fieldType = context.getFieldType(sourceField);
assert fieldType != null : "Unknown field type for field: [" + sourceField + "]";

if (fieldType instanceof AggregateMetricDoubleFieldMapper.AggregateMetricDoubleFieldType aggMetricFieldType) {
downsamplers.addAll(AggregateMetricDoubleFieldDownsampler.create(context, aggMetricFieldType, samplingMethod));
} else {
if (context.fieldExistsInIndex(field)) {
final IndexFieldData<?> fieldData;
if (fieldType instanceof FlattenedFieldMapper.RootFlattenedFieldType flattenedFieldType) {
var keyedFieldType = flattenedFieldType.getKeyedFieldType();
fieldData = context.getForField(keyedFieldType, MappedFieldType.FielddataOperation.SEARCH);
} else {
fieldData = context.getForField(fieldType, MappedFieldType.FielddataOperation.SEARCH);
}
downsamplers.add(create(field, fieldType, fieldData, samplingMethod));
}
}
}
return Collections.unmodifiableList(downsamplers);
}

/**
* Create a single field downsamplers for the provided field.
*/
private static AbstractFieldDownsampler<?> create(
String fieldName,
MappedFieldType fieldType,
IndexFieldData<?> fieldData,
DownsampleConfig.SamplingMethod samplingMethod
) {
assert AggregateMetricDoubleFieldDownsampler.supportsFieldType(fieldType) == false
: "Aggregate metric double should be handled by a dedicated downsampler";
if (TDigestHistogramFieldDownsampler.supportsFieldType(fieldType)) {
return TDigestHistogramFieldDownsampler.create(fieldName, fieldType, fieldData, samplingMethod);
}
if (ExponentialHistogramFieldDownsampler.supportsFieldType(fieldType)) {
return ExponentialHistogramFieldDownsampler.create(fieldName, fieldData, samplingMethod);
}
if (NumericMetricFieldDownsampler.supportsFieldType(fieldType)) {
return NumericMetricFieldDownsampler.create(fieldName, fieldType, fieldData, samplingMethod);
}
// TODO: Support POSITION in downsampling
if (fieldType.getMetricType() == POSITION) {
throw new IllegalArgumentException("Unsupported metric type [position] for downsampling");
}
// If a field is not a metric, we downsample it as a label
return LastValueFieldDownsampler.create(fieldName, fieldType, fieldData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,47 @@
package org.elasticsearch.xpack.downsample;

import org.apache.lucene.internal.hppc.IntArrayList;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateMetricDoubleFieldMapper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

import static org.elasticsearch.xpack.downsample.NumericMetricFieldProducer.MAX_NO_VALUE;
import static org.elasticsearch.xpack.downsample.NumericMetricFieldProducer.MIN_NO_VALUE;
import java.util.List;

/**
* A producer that can be used for downsampling aggregate metric double fields whether is a metric or a label. We need a separate producer
* for each a sub-metric of an aggregate metric double, this means to downsample an aggregate metric double we will need 4.
* A downsampler that can be used for downsampling aggregate metric double fields whether is a metric or a label. We need a separate
* downsampler for each a sub-metric of an aggregate metric double, this means to downsample an aggregate metric double we will need 4.
* This is mainly used when downsampling already downsampled indices.
*/
abstract class AggregateMetricDoubleFieldProducer extends AbstractDownsampleFieldProducer<SortedNumericDoubleValues> {
abstract sealed class AggregateMetricDoubleFieldDownsampler extends NumericMetricFieldDownsampler {

protected final AggregateMetricDoubleFieldMapper.Metric metric;

AggregateMetricDoubleFieldProducer(String name, AggregateMetricDoubleFieldMapper.Metric metric) {
super(name);
AggregateMetricDoubleFieldDownsampler(String name, AggregateMetricDoubleFieldMapper.Metric metric, IndexFieldData<?> fieldData) {
super(name, fieldData);
this.metric = metric;
}

static final class Aggregate extends AggregateMetricDoubleFieldProducer {
public static boolean supportsFieldType(MappedFieldType fieldType) {
return AggregateMetricDoubleFieldMapper.CONTENT_TYPE.equals(fieldType.typeName());
}

static final class Aggregate extends AggregateMetricDoubleFieldDownsampler {

private double max = MAX_NO_VALUE;
private double min = MIN_NO_VALUE;
private final CompensatedSum sum = new CompensatedSum();
private long count;

Aggregate(String name, AggregateMetricDoubleFieldMapper.Metric metric) {
super(name, metric);
Aggregate(String name, AggregateMetricDoubleFieldMapper.Metric metric, IndexFieldData<?> fieldData) {
super(name, metric, fieldData);
}

@Override
Expand All @@ -59,7 +66,6 @@ public void collect(SortedNumericDoubleValues docValues, IntArrayList docIdBuffe
case min -> min = Math.min(value, min);
case max -> max = Math.max(value, max);
case sum -> sum.add(value);
// This is the reason why we can't use GaugeMetricFieldProducer
// For downsampled indices aggregate metric double's value count field needs to be summed.
// (Note: not using CompensatedSum here should be ok given that value_count is mapped as long)
case value_count -> count += Math.round(value);
Expand Down Expand Up @@ -93,13 +99,13 @@ public void write(XContentBuilder builder) throws IOException {
/**
* Important note: This class assumes that field values are collected and sorted by descending order by time
*/
static final class LastValue extends AggregateMetricDoubleFieldProducer {
static final class LastValue extends AggregateMetricDoubleFieldDownsampler {

private final boolean supportsMultiValue;
private Object lastValue = null;

LastValue(String name, AggregateMetricDoubleFieldMapper.Metric metric, boolean supportsMultiValue) {
super(name, metric);
LastValue(String name, AggregateMetricDoubleFieldMapper.Metric metric, IndexFieldData<?> fieldData, boolean supportsMultiValue) {
super(name, metric, fieldData);
this.supportsMultiValue = supportsMultiValue;
}

Expand Down Expand Up @@ -145,21 +151,21 @@ public void write(XContentBuilder builder) throws IOException {
}

/**
* We use a specialised serializer because we are combining all the available submetric producers.
* We use a specialised serializer because we are combining all the available submetric downsamplers.
*/
static class Serializer implements DownsampleFieldSerializer {
private final Collection<AbstractDownsampleFieldProducer<?>> producers;
private final Collection<AbstractFieldDownsampler<?>> downsamplers;
private final String name;

/**
* @param name the name of the aggregate_metric_double field as it will be serialized
* in the downsampled index
* @param producers a collection of {@link AggregateMetricDoubleFieldProducer} instances with the subfields
* @param downsamplers a collection of {@link AggregateMetricDoubleFieldDownsampler} instances with the subfields
* of the aggregate_metric_double field.
*/
Serializer(String name, Collection<AbstractDownsampleFieldProducer<?>> producers) {
Serializer(String name, Collection<AbstractFieldDownsampler<?>> downsamplers) {
this.name = name;
this.producers = producers;
this.downsamplers = downsamplers;
}

@Override
Expand All @@ -169,28 +175,65 @@ public void write(XContentBuilder builder) throws IOException {
}

builder.startObject(name);
for (AbstractDownsampleFieldProducer<?> fieldProducer : producers) {
assert name.equals(fieldProducer.name()) : "producer has a different name";
if (fieldProducer.isEmpty()) {
for (AbstractFieldDownsampler<?> fieldDownsampler : downsamplers) {
assert name.equals(fieldDownsampler.name()) : "downsampler has a different name";
if (fieldDownsampler.isEmpty()) {
continue;
}
if (fieldProducer instanceof AggregateMetricDoubleFieldProducer == false) {
if (fieldDownsampler instanceof AggregateMetricDoubleFieldDownsampler == false) {
throw new IllegalStateException(
"Unexpected field producer class: " + fieldProducer.getClass().getSimpleName() + " for " + name + " field"
"Unexpected field downsampler class: " + fieldDownsampler.getClass().getSimpleName() + " for " + name + " field"
);
}
fieldProducer.write(builder);
fieldDownsampler.write(builder);
}
builder.endObject();
}

private boolean isEmpty() {
for (AbstractDownsampleFieldProducer<?> p : producers) {
if (p.isEmpty() == false) {
for (AbstractFieldDownsampler<?> d : downsamplers) {
if (d.isEmpty() == false) {
return false;
}
}
return true;
}
}

/**
* For aggregate_metric_double fields we create separate fetchers for each sub-metric. This is usually a downsample-of-downsample case.
*/
static List<AggregateMetricDoubleFieldDownsampler> create(
SearchExecutionContext context,
AggregateMetricDoubleFieldMapper.AggregateMetricDoubleFieldType aggMetricFieldType,
DownsampleConfig.SamplingMethod samplingMethod
) {
List<AggregateMetricDoubleFieldDownsampler> downsamplers = new ArrayList<>();
// If the field is an aggregate_metric_double field, we should load all its subfields
// This is usually a downsample-of-downsample case
for (var metricField : aggMetricFieldType.getMetricFields().entrySet()) {
var metric = metricField.getKey();
var metricSubField = metricField.getValue();
if (context.fieldExistsInIndex(metricSubField.name())) {
IndexFieldData<?> fieldData = context.getForField(metricSubField, MappedFieldType.FielddataOperation.SEARCH);
downsamplers.add(create(aggMetricFieldType, metric, fieldData, samplingMethod));
}
}
return downsamplers;
}

private static AggregateMetricDoubleFieldDownsampler create(
AggregateMetricDoubleFieldMapper.AggregateMetricDoubleFieldType fieldType,
AggregateMetricDoubleFieldMapper.Metric metric,
IndexFieldData<?> fieldData,
DownsampleConfig.SamplingMethod samplingMethod
) {
if (fieldType.getMetricType() == null) {
return new AggregateMetricDoubleFieldDownsampler.LastValue(fieldType.name(), metric, fieldData, true);
}
return switch (samplingMethod) {
case AGGREGATE -> new AggregateMetricDoubleFieldDownsampler.Aggregate(fieldType.name(), metric, fieldData);
case LAST_VALUE -> new AggregateMetricDoubleFieldDownsampler.LastValue(fieldType.name(), metric, fieldData, false);
};
}
}
Loading