diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/mapper/EncodedTDigest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/mapper/EncodedTDigest.java index 9595eb35c5e91..1685c4ff8e996 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/mapper/EncodedTDigest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/mapper/EncodedTDigest.java @@ -8,8 +8,10 @@ package org.elasticsearch.xpack.core.analytics.mapper; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.ByteArrayStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.tdigest.Centroid; import org.elasticsearch.tdigest.TDigestReadView; @@ -17,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; /** @@ -30,6 +33,12 @@ */ public final class EncodedTDigest implements TDigestReadView { + /** + * The size of a single EncodedTDigest instance in bytes, excluding the underlying encoded digest bytes array. + */ + public static final long RAM_BYTES = RamUsageEstimator.shallowSizeOfInstance(EncodedTDigest.class) + RamUsageEstimator + .shallowSizeOfInstance(BytesRef.class); + private final BytesRef encodedDigest = new BytesRef(); private long cachedSize = -1L; private double cachedMax = Double.NaN; @@ -73,32 +82,47 @@ public CentroidIterator centroidIterator() { /** * Encodes the provided centroids into a {@link BytesRef}. */ - public static BytesRef encodeCentroids(List centroids) { - return encodeCentroidsFromIterator(new CentroidIterator() { - private int index = -1; + public static BytesRef encodeCentroids(Collection centroids) { + try (BytesStreamOutput out = new BytesStreamOutput()) { + encodeCentroids(centroids, out); + return out.bytes().toBytesRef(); + } catch (IOException e) { + throw new IllegalStateException("Failed to encode centroid", e); + } + } + + public static void encodeCentroids(Collection centroids, StreamOutput out) throws IOException { + encodeCentroidsFromIterator(new CentroidIterator() { + private final Iterator iterator = centroids.iterator(); + Centroid centroid; @Override public boolean next() { - index++; - return index < centroids.size(); + if (iterator.hasNext() == false) { + return false; + } + centroid = iterator.next(); + return true; } @Override public long currentCount() { - return centroids.get(index).count(); + assert centroid != null : "next() must be called and return true before accessing current centroid"; + return centroid.count(); } @Override public double currentMean() { - return centroids.get(index).mean(); + assert centroid != null : "next() must be called and return true before accessing current centroid"; + return centroid.mean(); } @Override public boolean hasNext() { - return index + 1 < centroids.size(); + return iterator.hasNext(); } - }); + }, out); } /** @@ -106,30 +130,37 @@ public boolean hasNext() { */ public static BytesRef encodeCentroids(List means, List counts) { assert means.size() == counts.size() : "centroids and counts must have equal size"; - return encodeCentroidsFromIterator(new CentroidIterator() { - private int index = -1; + try (BytesStreamOutput out = new BytesStreamOutput()) { + encodeCentroidsFromIterator(new CentroidIterator() { + private int index = -1; - @Override - public boolean next() { - index++; - return index < means.size(); - } + @Override + public boolean next() { + index++; + return index < means.size(); + } - @Override - public long currentCount() { - return counts.get(index); - } + @Override + public long currentCount() { + assert index >= 0 : "next() must be called and return true before accessing current centroid"; + return counts.get(index); + } - @Override - public double currentMean() { - return means.get(index); - } + @Override + public double currentMean() { + assert index >= 0 : "next() must be called and return true before accessing current centroid"; + return means.get(index); + } - @Override - public boolean hasNext() { - return index + 1 < means.size(); - } - }); + @Override + public boolean hasNext() { + return index + 1 < means.size(); + } + }, out); + return out.bytes().toBytesRef(); + } catch (IOException e) { + throw new IllegalStateException("Failed to encode centroid", e); + } } @Override @@ -172,21 +203,16 @@ public Collection centroids() { return Collections.unmodifiableList(decoded); } - private static BytesRef encodeCentroidsFromIterator(CentroidIterator centroids) { - try (BytesStreamOutput out = new BytesStreamOutput()) { - while (centroids.next()) { - long count = centroids.currentCount(); - if (count < 0) { - throw new IllegalArgumentException("Centroid count cannot be negative: " + count); - } - if (count > 0) { - out.writeVLong(count); - out.writeDouble(centroids.currentMean()); - } + private static void encodeCentroidsFromIterator(CentroidIterator centroids, StreamOutput out) throws IOException { + while (centroids.next()) { + long count = centroids.currentCount(); + if (count < 0) { + throw new IllegalArgumentException("Centroid count cannot be negative: " + count); + } + if (count > 0) { + out.writeVLong(count); + out.writeDouble(centroids.currentMean()); } - return out.bytes().toBytesRef(); - } catch (IOException e) { - throw new IllegalStateException("Failed to encode centroid", e); } } diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java index 4ea3a422822f0..a9219170f70bb 100644 --- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java +++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Types.java @@ -166,7 +166,7 @@ public static TypeDef of(TypeName type, String alias, String block, String vecto TypeDef.of(TypeName.DOUBLE, "DOUBLE", "DoubleBlock", "DoubleVector", null), TypeDef.of(BYTES_REF, "BYTES_REF", "BytesRefBlock", "BytesRefVector", BYTES_REF), TypeDef.of(EXPONENTIAL_HISTOGRAM, "EXPONENTIAL_HISTOGRAM", "ExponentialHistogramBlock", null, EXPONENTIAL_HISTOGRAM_SCRATCH), - TypeDef.of(TDIGEST, "TDIGEST", "TDigestBlock", null, null) + TypeDef.of(TDIGEST, "TDIGEST", "TDigestBlock", null, TDIGEST) ) .flatMap(def -> Stream.of(def.type.toString(), def.type + "[]", def.alias).map(alias -> Map.entry(alias, def))) .collect(toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeTDigestAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeTDigestAggregatorFunction.java index 4b34dabc63a8f..6c87519c95b8b 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeTDigestAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeTDigestAggregatorFunction.java @@ -76,6 +76,7 @@ private void addRawInputNotMasked(Page page) { } private void addRawBlock(TDigestBlock valueBlock) { + TDigestHolder valueScratch = new TDigestHolder(); for (int p = 0; p < valueBlock.getPositionCount(); p++) { int valueValueCount = valueBlock.getValueCount(p); if (valueValueCount == 0) { @@ -84,13 +85,14 @@ private void addRawBlock(TDigestBlock valueBlock) { int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueValueCount; for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { - TDigestHolder valueValue = valueBlock.getTDigestHolder(valueOffset); + TDigestHolder valueValue = valueBlock.getTDigestHolder(valueOffset, valueScratch); HistogramMergeTDigestAggregator.combine(state, valueValue); } } } private void addRawBlock(TDigestBlock valueBlock, BooleanVector mask) { + TDigestHolder valueScratch = new TDigestHolder(); for (int p = 0; p < valueBlock.getPositionCount(); p++) { if (mask.getBoolean(p) == false) { continue; @@ -102,7 +104,7 @@ private void addRawBlock(TDigestBlock valueBlock, BooleanVector mask) { int valueStart = valueBlock.getFirstValueIndex(p); int valueEnd = valueStart + valueValueCount; for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { - TDigestHolder valueValue = valueBlock.getTDigestHolder(valueOffset); + TDigestHolder valueValue = valueBlock.getTDigestHolder(valueOffset, valueScratch); HistogramMergeTDigestAggregator.combine(state, valueValue); } } @@ -124,7 +126,8 @@ public void addIntermediateInput(Page page) { } BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); assert seen.getPositionCount() == 1; - HistogramMergeTDigestAggregator.combineIntermediate(state, value.getTDigestHolder(value.getFirstValueIndex(0)), seen.getBoolean(0)); + TDigestHolder valueScratch = new TDigestHolder(); + HistogramMergeTDigestAggregator.combineIntermediate(state, value.getTDigestHolder(value.getFirstValueIndex(0), valueScratch), seen.getBoolean(0)); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeTDigestGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeTDigestGroupingAggregatorFunction.java index 2398a4eea22fb..7af823fa05f11 100644 --- a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeTDigestGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/HistogramMergeTDigestGroupingAggregatorFunction.java @@ -85,6 +85,7 @@ public void close() { } private void addRawInput(int positionOffset, IntArrayBlock groups, TDigestBlock valueBlock) { + TDigestHolder valueScratch = new TDigestHolder(); for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { continue; @@ -100,7 +101,7 @@ private void addRawInput(int positionOffset, IntArrayBlock groups, TDigestBlock int valueStart = valueBlock.getFirstValueIndex(valuesPosition); int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { - TDigestHolder valueValue = valueBlock.getTDigestHolder(valueOffset); + TDigestHolder valueValue = valueBlock.getTDigestHolder(valueOffset, valueScratch); HistogramMergeTDigestAggregator.combine(state, groupId, valueValue); } } @@ -122,6 +123,7 @@ public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page } BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); assert value.getPositionCount() == seen.getPositionCount(); + TDigestHolder valueScratch = new TDigestHolder(); for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { continue; @@ -131,12 +133,13 @@ public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page for (int g = groupStart; g < groupEnd; g++) { int groupId = groups.getInt(g); int valuesPosition = groupPosition + positionOffset; - HistogramMergeTDigestAggregator.combineIntermediate(state, groupId, value.getTDigestHolder(value.getFirstValueIndex(valuesPosition)), seen.getBoolean(valuesPosition)); + HistogramMergeTDigestAggregator.combineIntermediate(state, groupId, value.getTDigestHolder(value.getFirstValueIndex(valuesPosition), valueScratch), seen.getBoolean(valuesPosition)); } } } private void addRawInput(int positionOffset, IntBigArrayBlock groups, TDigestBlock valueBlock) { + TDigestHolder valueScratch = new TDigestHolder(); for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { continue; @@ -152,7 +155,7 @@ private void addRawInput(int positionOffset, IntBigArrayBlock groups, TDigestBlo int valueStart = valueBlock.getFirstValueIndex(valuesPosition); int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { - TDigestHolder valueValue = valueBlock.getTDigestHolder(valueOffset); + TDigestHolder valueValue = valueBlock.getTDigestHolder(valueOffset, valueScratch); HistogramMergeTDigestAggregator.combine(state, groupId, valueValue); } } @@ -174,6 +177,7 @@ public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Pa } BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); assert value.getPositionCount() == seen.getPositionCount(); + TDigestHolder valueScratch = new TDigestHolder(); for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { if (groups.isNull(groupPosition)) { continue; @@ -183,12 +187,13 @@ public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Pa for (int g = groupStart; g < groupEnd; g++) { int groupId = groups.getInt(g); int valuesPosition = groupPosition + positionOffset; - HistogramMergeTDigestAggregator.combineIntermediate(state, groupId, value.getTDigestHolder(value.getFirstValueIndex(valuesPosition)), seen.getBoolean(valuesPosition)); + HistogramMergeTDigestAggregator.combineIntermediate(state, groupId, value.getTDigestHolder(value.getFirstValueIndex(valuesPosition), valueScratch), seen.getBoolean(valuesPosition)); } } } private void addRawInput(int positionOffset, IntVector groups, TDigestBlock valueBlock) { + TDigestHolder valueScratch = new TDigestHolder(); for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { int valuesPosition = groupPosition + positionOffset; if (valueBlock.isNull(valuesPosition)) { @@ -198,7 +203,7 @@ private void addRawInput(int positionOffset, IntVector groups, TDigestBlock valu int valueStart = valueBlock.getFirstValueIndex(valuesPosition); int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition); for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) { - TDigestHolder valueValue = valueBlock.getTDigestHolder(valueOffset); + TDigestHolder valueValue = valueBlock.getTDigestHolder(valueOffset, valueScratch); HistogramMergeTDigestAggregator.combine(state, groupId, valueValue); } } @@ -219,10 +224,11 @@ public void addIntermediateInput(int positionOffset, IntVector groups, Page page } BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); assert value.getPositionCount() == seen.getPositionCount(); + TDigestHolder valueScratch = new TDigestHolder(); for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { int groupId = groups.getInt(groupPosition); int valuesPosition = groupPosition + positionOffset; - HistogramMergeTDigestAggregator.combineIntermediate(state, groupId, value.getTDigestHolder(value.getFirstValueIndex(valuesPosition)), seen.getBoolean(valuesPosition)); + HistogramMergeTDigestAggregator.combineIntermediate(state, groupId, value.getTDigestHolder(value.getFirstValueIndex(valuesPosition), valueScratch), seen.getBoolean(valuesPosition)); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TDigestStates.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TDigestStates.java index 74d110182237e..26cc5302db422 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TDigestStates.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TDigestStates.java @@ -10,7 +10,6 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; -import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; @@ -18,8 +17,10 @@ import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.core.Releasables; +import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays; import org.elasticsearch.search.aggregations.metrics.TDigestExecutionHint; -import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.tdigest.TDigest; +import org.elasticsearch.xpack.core.analytics.mapper.EncodedTDigest; import java.util.function.DoubleBinaryOperator; @@ -41,19 +42,27 @@ private static double nanAwareAgg(double v1, double v2, DoubleBinaryOperator op) return op.applyAsDouble(v1, v2); } + private static TDigestHolder asTDigestHolder(TDigest tdigest, double sum, double min, double max) { + // TODO: replace with BreakingTDigestHolder when added for proper memory accounting and reuse + TDigestHolder holder = new TDigestHolder(); + holder.reset(EncodedTDigest.encodeCentroids(tdigest.centroids()), min, max, sum, tdigest.size()); + return holder; + } + static final class SingleState implements AggregatorState { private final CircuitBreaker breaker; + private final MemoryTrackingTDigestArrays tdigestArrays; // initialize lazily - private TDigestState merger; + private TDigest merger; double sum = Double.NaN; double min = Double.NaN; double max = Double.NaN; - long count = 0; SingleState(CircuitBreaker breaker) { this.breaker = breaker; + this.tdigestArrays = new MemoryTrackingTDigestArrays(breaker); } public void add(TDigestHolder histogram) { @@ -61,11 +70,10 @@ public void add(TDigestHolder histogram) { return; } if (merger == null) { - merger = TDigestState.createOfType(breaker, TDigestState.Type.MERGING, COMPRESSION); + merger = TDigest.createMergingDigest(tdigestArrays, COMPRESSION); } - histogram.addTo(merger); + merger.add(histogram); sum = nanAwareAgg(histogram.getSum(), sum, Double::sum); - count += histogram.getValueCount(); min = nanAwareAgg(histogram.getMin(), min, Double::min); max = nanAwareAgg(histogram.getMax(), max, Double::max); } @@ -79,8 +87,7 @@ public void toIntermediate(Block[] blocks, int offset, DriverContext driverConte blocks[offset] = blockFactory.newConstantTDigestBlock(TDigestHolder.empty(), 1); blocks[offset + 1] = blockFactory.newConstantBooleanBlockWith(false, 1); } else { - TDigestHolder resultHolder = new TDigestHolder(merger, min, max, sum, count); - blocks[offset] = blockFactory.newConstantTDigestBlock(resultHolder, 1); + blocks[offset] = blockFactory.newConstantTDigestBlock(asTDigestHolder(merger, sum, min, max), 1); blocks[offset + 1] = blockFactory.newConstantBooleanBlockWith(true, 1); } } @@ -96,52 +103,49 @@ public Block evaluateFinal(DriverContext driverContext) { if (merger == null) { return blockFactory.newConstantNullBlock(1); } else { - TDigestHolder resultHolder = new TDigestHolder(merger, min, max, sum, count); - return blockFactory.newConstantTDigestBlock(resultHolder, 1); + return blockFactory.newConstantTDigestBlock(asTDigestHolder(merger, sum, min, max), 1); } } } static final class GroupingState implements GroupingAggregatorState { - private ObjectArray states; + private ObjectArray states; private DoubleArray minima; private DoubleArray maxima; private DoubleArray sums; - private LongArray counts; private final CircuitBreaker breaker; private final BigArrays bigArrays; + private final MemoryTrackingTDigestArrays tdigestArrays; GroupingState(BigArrays bigArrays, CircuitBreaker breaker) { this.bigArrays = bigArrays; this.breaker = breaker; - ObjectArray states = null; + this.tdigestArrays = new MemoryTrackingTDigestArrays(breaker); + ObjectArray states = null; DoubleArray minima = null; DoubleArray maxima = null; DoubleArray sums = null; - LongArray counts = null; boolean success = false; try { states = bigArrays.newObjectArray(1); minima = bigArrays.newDoubleArray(1); maxima = bigArrays.newDoubleArray(1); sums = bigArrays.newDoubleArray(1); - counts = bigArrays.newLongArray(1); success = true; } finally { if (success == false) { - Releasables.close(states, minima, maxima, sums, counts); + Releasables.close(states, minima, maxima, sums); } } this.states = states; this.minima = minima; this.maxima = maxima; this.sums = sums; - this.counts = counts; } - TDigestState getOrNull(int position) { + TDigest getOrNull(int position) { if (position < states.size()) { return states.get(position); } else { @@ -158,25 +162,21 @@ public void add(int groupId, TDigestHolder histogram) { double min; double max; double sum; - long count; if (state == null) { - state = TDigestState.createOfType(breaker, TDigestState.Type.MERGING, COMPRESSION); + state = TDigest.createMergingDigest(tdigestArrays, COMPRESSION); states.set(groupId, state); min = Double.NaN; max = Double.NaN; sum = Double.NaN; - count = 0L; } else { min = minima.get(groupId); max = maxima.get(groupId); sum = sums.get(groupId); - count = counts.get(groupId); } - histogram.addTo(state); + state.add(histogram); minima.set(groupId, nanAwareAgg(min, histogram.getMin(), Double::min)); maxima.set(groupId, nanAwareAgg(max, histogram.getMax(), Double::max)); sums.set(groupId, nanAwareAgg(sum, histogram.getSum(), Double::sum)); - counts.set(groupId, count + histogram.getValueCount()); } private void ensureCapacity(int groupId) { @@ -184,7 +184,6 @@ private void ensureCapacity(int groupId) { minima = bigArrays.grow(minima, groupId + 1); maxima = bigArrays.grow(maxima, groupId + 1); sums = bigArrays.grow(sums, groupId + 1); - counts = bigArrays.grow(counts, groupId + 1); } @Override @@ -196,12 +195,10 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive ) { for (int i = 0; i < selected.getPositionCount(); i++) { int groupId = selected.getInt(i); - TDigestState state = getOrNull(groupId); + TDigest state = getOrNull(groupId); if (state != null) { seenBuilder.appendBoolean(true); - histoBuilder.appendTDigest( - new TDigestHolder(state, minima.get(groupId), maxima.get(groupId), sums.get(groupId), counts.get(groupId)) - ); + histoBuilder.appendTDigest(asTDigestHolder(state, sums.get(groupId), minima.get(groupId), maxima.get(groupId))); } else { seenBuilder.appendBoolean(false); histoBuilder.appendTDigest(TDigestHolder.empty()); @@ -213,14 +210,12 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive } public Block evaluateFinal(IntVector selected, DriverContext driverContext) { - try (var histoBuilder = driverContext.blockFactory().newTDigestBlockBuilder(selected.getPositionCount());) { + try (var histoBuilder = driverContext.blockFactory().newTDigestBlockBuilder(selected.getPositionCount())) { for (int i = 0; i < selected.getPositionCount(); i++) { int groupId = selected.getInt(i); - TDigestState state = getOrNull(groupId); + TDigest state = getOrNull(groupId); if (state != null) { - histoBuilder.appendTDigest( - new TDigestHolder(state, minima.get(groupId), maxima.get(groupId), sums.get(groupId), counts.get(groupId)) - ); + histoBuilder.appendTDigest(asTDigestHolder(state, sums.get(groupId), minima.get(groupId), maxima.get(groupId))); } else { histoBuilder.appendNull(); } @@ -234,7 +229,7 @@ public void close() { for (int i = 0; i < states.size(); i++) { Releasables.close(states.get(i)); } - Releasables.close(states, minima, maxima, sums, counts); + Releasables.close(states, minima, maxima, sums); states = null; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java index fb3224736d9c7..27ac1356c3f1e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/BlockUtils.java @@ -329,14 +329,17 @@ yield new AggregateMetricDoubleLiteral( case TDIGEST -> { TDigestBlock tDigestBlock = (TDigestBlock) block; // return a copy so that the returned value is not bound to the lifetime of the block - TDigestHolder blockBacked = tDigestBlock.getTDigestHolder(offset); - yield new TDigestHolder( + TDigestHolder blockBacked = new TDigestHolder(); + blockBacked = tDigestBlock.getTDigestHolder(offset, blockBacked); + TDigestHolder copy = new TDigestHolder(); + copy.reset( BytesRef.deepCopyOf(blockBacked.getEncodedDigest()), blockBacked.getMin(), blockBacked.getMax(), blockBacked.getSum(), - blockBacked.getValueCount() + blockBacked.size() ); + yield copy; } case LONG_RANGE -> { LongRangeBlock b = (LongRangeBlock) block; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java index 833f2ea3f4a63..7d2222055a8a4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java @@ -319,7 +319,7 @@ public void serializeTDigest(int valueIndex, SerializedTDigestOutput out, BytesR } @Override - public TDigestHolder getTDigestHolder(int valueIndex) { + public TDigestHolder getTDigestHolder(int valueIndex, TDigestHolder scratch) { assert false : "null block"; throw new UnsupportedOperationException("null block"); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java index f4a4842f42d03..985b0cdd68136 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestArrayBlock.java @@ -248,15 +248,14 @@ public void serializeTDigest(int valueIndex, SerializedTDigestOutput out, BytesR } @Override - public TDigestHolder getTDigestHolder(int offset) { - return new TDigestHolder( - // TODO: Memory tracking? creating a new bytes ref here doesn't seem great - encodedDigests.getBytesRef(encodedDigests.getFirstValueIndex(offset), new BytesRef()), - minima.isNull(offset) ? Double.NaN : minima.getDouble(minima.getFirstValueIndex(offset)), - maxima.isNull(offset) ? Double.NaN : maxima.getDouble(maxima.getFirstValueIndex(offset)), - sums.isNull(offset) ? Double.NaN : sums.getDouble(sums.getFirstValueIndex(offset)), - valueCounts.getLong(valueCounts.getFirstValueIndex(offset)) - ); + public TDigestHolder getTDigestHolder(int offset, TDigestHolder scratch) { + var encoded = encodedDigests.getBytesRef(encodedDigests.getFirstValueIndex(offset), scratch.scratchBytesRef()); + double min = minima.isNull(offset) ? Double.NaN : minima.getDouble(minima.getFirstValueIndex(offset)); + double max = maxima.isNull(offset) ? Double.NaN : maxima.getDouble(maxima.getFirstValueIndex(offset)); + double sum = sums.isNull(offset) ? Double.NaN : sums.getDouble(sums.getFirstValueIndex(offset)); + long valueCount = valueCounts.getLong(valueCounts.getFirstValueIndex(offset)); + scratch.reset(encoded, min, max, sum, valueCount); + return scratch; } public static TDigestBlock createConstant(TDigestHolder histogram, int positionCount, BlockFactory blockFactory) { @@ -267,7 +266,7 @@ public static TDigestBlock createConstant(TDigestHolder histogram, int positionC BytesRefBlock encodedDigestsBlock = null; boolean success = false; try { - countBlock = blockFactory.newConstantLongBlockWith(histogram.getValueCount(), positionCount); + countBlock = blockFactory.newConstantLongBlockWith(histogram.size(), positionCount); if (Double.isNaN(histogram.getMin())) { minBlock = (DoubleBlock) blockFactory.newConstantNullBlock(positionCount); } else { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java index 46e21b550388b..a08c0aa8c6074 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlock.java @@ -43,7 +43,7 @@ sealed interface Builder extends Block.Builder, BlockLoader.TDigestBuilder permi TDigestBlock build(); } - TDigestHolder getTDigestHolder(int offset); + TDigestHolder getTDigestHolder(int offset, TDigestHolder scratch); interface SerializedTDigestOutput { void appendDouble(double value); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlockBuilder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlockBuilder.java index 109ae3a3c6f5e..977f7bafc8f5d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlockBuilder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestBlockBuilder.java @@ -176,7 +176,7 @@ public void appendTDigest(TDigestHolder val) { } else { sumsBuilder.appendDouble(val.getSum()); } - valueCountsBuilder.appendLong(val.getValueCount()); + valueCountsBuilder.appendLong(val.size()); } public void deserializeAndAppend(TDigestBlock.SerializedTDigestInput input) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java index bfe20c2f4056c..5a7ccbc3f4722 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java @@ -8,45 +8,48 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.TransportVersion; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.ByteArrayStreamInput; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.GenericNamedWriteable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.search.aggregations.metrics.TDigestState; import org.elasticsearch.tdigest.Centroid; +import org.elasticsearch.tdigest.TDigestReadView; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; -import org.elasticsearch.xpack.core.analytics.mapper.TDigestParser; +import org.elasticsearch.xpack.core.analytics.mapper.EncodedTDigest; import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; +import java.util.Collection; import java.util.Objects; /** - * This exists to hold the values from a {@link TDigestBlock}. It is roughly parallel to - * {@link org.elasticsearch.search.aggregations.metrics.TDigestState} in classic aggregations, which we are not using directly because - * the serialization format is pretty bad for ESQL's use case (specifically, encoding the near-constant compression and merge strategy - * data inline as opposed to in a dedicated column isn't great). - * - * This is writable to support ESQL literals of this type, even though those should not exist. Literal support, and thus a writeable - * object here, are required for ESQL testing. See for example ShowExecSerializationTest. + * This is a {@link TDigestReadView} annotated with some extra information: The sum, min and max of all observations. + * In addition, it stores the size as a dedicated field for faster access. + * The TDigest is represented a list of centroids and their counts, encoded in a byte array. + * This class does not own the underlying memory used to store the digest, it is merely a pointer/accessor for e.g. + * a single value in a {@link TDigestBlock}. + *
+ * This class supports serialization, but this is only intended for use in ES|QL Literals, as it uses untracked memory on deserialization. */ -public class TDigestHolder implements GenericNamedWriteable { - - private static final TDigestHolder EMPTY; - static { - try { - EMPTY = new TDigestHolder(encodeCentroidsAndCounts(List.of(), List.of()), Double.NaN, Double.NaN, Double.NaN, 0L); - } catch (IOException e) { - throw new IllegalStateException(e); +public class TDigestHolder implements GenericNamedWriteable, TDigestReadView { + + /** + * This size of a single TDigestHolder instance in bytes, excluding the underlying encoded digest bytes array. + * The encoded digest bytes are not owned by this class, so they are not included in the accounting. + */ + static final long RAM_BYTES = RamUsageEstimator.shallowSizeOfInstance(TDigestHolder.class) + RamUsageEstimator.shallowSizeOfInstance( + BytesRef.class + ) + EncodedTDigest.RAM_BYTES; + + private static final TDigestHolder EMPTY = new TDigestHolder() { + @Override + public void reset(BytesRef encodedDigest, double min, double max, double sum, long valueCount) { + throw new UnsupportedOperationException("This instance is immutable"); } - } + }; private static final TransportVersion ESQL_SERIALIZEABLE_TDIGEST = TransportVersion.fromName("esql_serializeable_tdigest"); @@ -56,58 +59,44 @@ public class TDigestHolder implements GenericNamedWriteable { TDigestHolder::new ); - private final double min; - private final double max; - private final double sum; - private final long valueCount; - private final BytesRef encodedDigest; + private final EncodedTDigest encodedDigest = new EncodedTDigest(); + private final BytesRef scratchBytesRef = new BytesRef(); - // TODO - Deal with the empty array case better - public TDigestHolder(BytesRef encodedDigest, double min, double max, double sum, long valueCount) { - this.encodedDigest = encodedDigest; - this.min = min; - this.max = max; - this.sum = sum; - this.valueCount = valueCount; + private double min = Double.NaN; + private double max = Double.NaN; + private double sum = Double.NaN; + private long valueCount = 0L; + + public TDigestHolder() {} + + BytesRef scratchBytesRef() { + return scratchBytesRef; } - public TDigestHolder(TDigestState rawTDigest, double min, double max, double sum, long valueCount) { - try { - this.encodedDigest = encodeCentroidsAndCounts(rawTDigest); - } catch (IOException e) { - throw new IllegalStateException("Error encoding TDigest", e); - } + // Note that this constructor allocates the bytes without memory accounting + public TDigestHolder(StreamInput in) throws IOException { + this.encodedDigest.reset(in.readBytesRef()); + this.min = in.readDouble(); + this.max = in.readDouble(); + this.sum = in.readDouble(); + this.valueCount = in.readVLong(); + } + + public void reset(BytesRef encodedDigest, double min, double max, double sum, long valueCount) { this.min = min; this.max = max; this.sum = sum; this.valueCount = valueCount; - } - - // TODO: Probably TDigestHolder and ParsedTDigest should be the same object - public TDigestHolder(TDigestParser.ParsedTDigest parsed) throws IOException { - this(parsed.centroids(), parsed.counts(), parsed.min(), parsed.max(), parsed.sum(), parsed.count()); - } - - public TDigestHolder(List centroids, List counts, double min, double max, double sum, long valueCount) - throws IOException { - this(encodeCentroidsAndCounts(centroids, counts), min, max, sum, valueCount); + this.encodedDigest.reset(encodedDigest); } public static TDigestHolder empty() { return EMPTY; } - public TDigestHolder(StreamInput in) throws IOException { - this.encodedDigest = in.readBytesRef(); - this.min = in.readDouble(); - this.max = in.readDouble(); - this.sum = in.readDouble(); - this.valueCount = in.readVLong(); - } - @Override public void writeTo(StreamOutput out) throws IOException { - out.writeBytesRef(encodedDigest); + out.writeBytesRef(encodedDigest.encodedDigest()); out.writeDouble(min); out.writeDouble(max); out.writeDouble(sum); @@ -121,86 +110,49 @@ public boolean equals(Object o) { && Double.compare(max, that.max) == 0 && Double.compare(sum, that.sum) == 0 && valueCount == that.valueCount - && Objects.equals(encodedDigest, that.encodedDigest); + && Objects.equals(encodedDigest.encodedDigest(), that.encodedDigest.encodedDigest()); } return false; } @Override public int hashCode() { - return Objects.hash(min, max, sum, valueCount, encodedDigest); + return Objects.hash(min, max, sum, valueCount, encodedDigest.encodedDigest()); } - private static BytesRef encodeCentroidsAndCounts(List centroids, List counts) throws IOException { - // TODO: This is copied from the method of the same name in TDigestFieldMapper. It would be nice to find a way to reuse that code - BytesStreamOutput streamOutput = new BytesStreamOutput(); - - for (int i = 0; i < centroids.size(); i++) { - long count = counts.get(i); - assert count >= 0; - // we do not add elements with count == 0 - if (count > 0) { - streamOutput.writeVLong(count); - streamOutput.writeDouble(centroids.get(i)); - } - } - - BytesRef docValue = streamOutput.bytes().toBytesRef(); - return docValue; + public BytesRef getEncodedDigest() { + return encodedDigest.encodedDigest(); } - private static BytesRef encodeCentroidsAndCounts(TDigestState rawTDigest) throws IOException { - // TODO: This is copied from the method of the same name in TDigestFieldMapper. It would be nice to find a way to reuse that code - BytesStreamOutput streamOutput = new BytesStreamOutput(); - - for (Iterator it = rawTDigest.uniqueCentroids(); it.hasNext();) { - Centroid centroid = it.next(); - if (centroid.count() > 0) { - streamOutput.writeVLong(centroid.count()); - streamOutput.writeDouble(centroid.mean()); - } - } - - BytesRef docValue = streamOutput.bytes().toBytesRef(); - return docValue; + @Override + public double getMax() { + return max; } - public void addTo(TDigestState state) { - try { - // TODO: The decoding is copied from TDigestFieldMapper. It would be nice to find a way to reuse that code - ByteArrayStreamInput values = new ByteArrayStreamInput(); - values.reset(encodedDigest.bytes, encodedDigest.offset, encodedDigest.length); - while (values.available() > 0) { - long count = values.readVLong(); - double centroid = values.readDouble(); - state.add(centroid, count); - } - } catch (IOException e) { - throw new IllegalStateException("Malformed TDigest bytes", e); - } + @Override + public double getMin() { + return min; } - public BytesRef getEncodedDigest() { - return encodedDigest; + @Override + public long size() { + return valueCount; } - // TODO - compute these if they're not given? or do that at object creation time, maybe. - public double getMax() { - return max; + @Override + public Collection centroids() { + return encodedDigest.centroids(); } - public double getMin() { - return min; + @Override + public int centroidCount() { + return encodedDigest.centroidCount(); } public double getSum() { return sum; } - public long getValueCount() { - return valueCount; - } - @Override public String toString() { // TODO: this is largely duplicated from TDigestFieldMapepr's synthetic source support, and we should refactor all of that. @@ -217,26 +169,20 @@ public String toString() { builder.field("sum", this.getSum()); } - // TODO: Would be nice to wrap all of this in reusable objects and minimize allocations here - ByteArrayStreamInput values = new ByteArrayStreamInput(); - values.reset(encodedDigest.bytes, encodedDigest.offset, encodedDigest.length); - List centroids = new ArrayList<>(); - List counts = new ArrayList<>(); - while (values.available() > 0) { - counts.add(values.readVLong()); - centroids.add(values.readDouble()); - } - // TODO: reuse the constans from the field type + builder.startArray("centroids"); - for (Double centroid : centroids) { - builder.value(centroid.doubleValue()); + + EncodedTDigest.CentroidIterator iterator = encodedDigest.centroidIterator(); + while (iterator.next()) { + builder.value(iterator.currentMean()); } builder.endArray(); builder.startArray("counts"); - for (Long count : counts) { - builder.value(count.longValue()); + iterator = encodedDigest.centroidIterator(); + while (iterator.next()) { + builder.value(iterator.currentCount()); } builder.endArray(); builder.endObject(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/HistogramMergeTDigestAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/HistogramMergeTDigestAggregatorFunctionTests.java index aa0cd089af2e2..3583ea4eb6602 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/HistogramMergeTDigestAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/HistogramMergeTDigestAggregatorFunctionTests.java @@ -57,13 +57,13 @@ protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { @Override protected void assertSimpleOutput(List input, Block result) { List inputValues = input.stream().flatMap(p -> allTDigests(p.getBlock(0))).toList(); - TDigestHolder value = ((TDigestBlock) result).getTDigestHolder(0); + TDigestHolder value = ((TDigestBlock) result).getTDigestHolder(0, new TDigestHolder()); assertThat(TDigestTestUtils.isMergedFrom(value, inputValues), equalTo(true)); } protected static Stream allTDigests(Block input) { TDigestBlock b = (TDigestBlock) input; - return allValueOffsets(b).mapToObj(b::getTDigestHolder); + return allValueOffsets(b).mapToObj(offset -> b.getTDigestHolder(offset, new TDigestHolder())); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/HistogramMergeTDigestGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/HistogramMergeTDigestGroupingAggregatorFunctionTests.java index 45b003d022052..b8c3f95a5d91a 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/HistogramMergeTDigestGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/HistogramMergeTDigestGroupingAggregatorFunctionTests.java @@ -56,7 +56,8 @@ protected void assertSimpleGroup(List input, Block result, int position, L TDigestHolder value = null; if (result.isNull(position) == false) { - value = ((TDigestBlock) result).getTDigestHolder(position); + TDigestBlock tDigestBlock = (TDigestBlock) result; + value = tDigestBlock.getTDigestHolder(tDigestBlock.getFirstValueIndex(position), new TDigestHolder()); } if (allHistograms.isEmpty()) { @@ -68,6 +69,6 @@ protected void assertSimpleGroup(List input, Block result, int position, L protected static Stream allTDigests(Page page, Long group) { TDigestBlock b = page.getBlock(1); - return allValueOffsets(page, group).mapToObj(b::getTDigestHolder); + return allValueOffsets(page, group).mapToObj(offset -> b.getTDigestHolder(offset, new TDigestHolder())); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/TDigestBlockTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/TDigestBlockTests.java index 020603ebb44be..3f31f3a32cf50 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/TDigestBlockTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/TDigestBlockTests.java @@ -29,7 +29,7 @@ public void testComponentAccess() { if (block.isNull(i)) { assertThat(componentBlock.isNull(i), equalTo(true)); } else { - TDigestHolder histo = block.getTDigestHolder(i); + TDigestHolder histo = block.getTDigestHolder(i, new TDigestHolder()); switch (component) { case MIN -> { double expectedMin = histo.getMin(); @@ -63,7 +63,7 @@ public void testComponentAccess() { case COUNT -> { assertThat(componentBlock.getValueCount(i), equalTo(1)); int valueIndex = componentBlock.getFirstValueIndex(i); - assertThat(((DoubleBlock) componentBlock).getDouble(valueIndex), equalTo((double) histo.getValueCount())); + assertThat(((DoubleBlock) componentBlock).getDouble(valueIndex), equalTo((double) histo.size())); } } } diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java index 81407ed0de58f..d3d16ffef8f52 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java @@ -8,6 +8,8 @@ package org.elasticsearch.compute.test; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.compute.data.AggregateMetricDoubleBlock; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; import org.elasticsearch.compute.data.Block; @@ -38,8 +40,9 @@ import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; import org.elasticsearch.exponentialhistogram.ReleasableExponentialHistogram; import org.elasticsearch.exponentialhistogram.ZeroBucket; -import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays; import org.elasticsearch.tdigest.Centroid; +import org.elasticsearch.tdigest.TDigest; import org.hamcrest.Matcher; import java.io.IOException; @@ -54,7 +57,6 @@ import static org.elasticsearch.common.time.DateUtils.MAX_MILLIS_BEFORE_9999; import static org.elasticsearch.compute.data.BlockUtils.toJavaObject; import static org.elasticsearch.test.ESTestCase.between; -import static org.elasticsearch.test.ESTestCase.fail; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomDouble; import static org.elasticsearch.test.ESTestCase.randomFloat; @@ -383,7 +385,7 @@ public static List valuesAtPosition(Block block, int position, boolean em i++, new ExponentialHistogramScratch() ); - case TDIGEST -> ((TDigestBlock) block).getTDigestHolder(i++); + case TDIGEST -> ((TDigestBlock) block).getTDigestHolder(i++, new TDigestHolder()); default -> throw new IllegalArgumentException("unsupported element type [" + block.elementType() + "]"); }); } @@ -480,34 +482,30 @@ public static ExponentialHistogram randomExponentialHistogram() { public static TDigestHolder randomTDigest() { // TODO: This is mostly copied from TDigestFieldMapperTests and EsqlTestUtils; refactor it. int size = between(1, 100); - // Note - we use TDigestState to build an actual t-digest for realistic values here - TDigestState digest = TDigestState.createWithoutCircuitBreaking(100); + NoopCircuitBreaker noopBreaker = new NoopCircuitBreaker("test-breaker"); + TDigest digest = TDigest.createMergingDigest(new MemoryTrackingTDigestArrays(noopBreaker), 100); for (int i = 0; i < size; i++) { double sample = randomGaussianDouble(); int count = randomIntBetween(1, Integer.MAX_VALUE); digest.add(sample, count); } - List centroids = new ArrayList<>(); - List counts = new ArrayList<>(); double sum = 0.0; - long valueCount = 0L; for (Centroid c : digest.centroids()) { - centroids.add(c.mean()); - counts.add(c.count()); sum += c.mean() * c.count(); - valueCount += c.count(); } - double min = digest.getMin(); - double max = digest.getMax(); - TDigestHolder returnValue = null; - try { - returnValue = new TDigestHolder(centroids, counts, min, max, sum, valueCount); + TDigestHolder digestHolder = new TDigestHolder(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + // TODO: replace with BreakingTDigestHolder when added + for (Centroid centroid : digest.centroids()) { + out.writeVLong(centroid.count()); + out.writeDouble(centroid.mean()); + } + digestHolder.reset(out.bytes().toBytesRef(), digest.getMin(), digest.getMax(), sum, digest.size()); } catch (IOException e) { - // This is a test util, so we're just going to fail the test here - fail(e); + throw new IllegalStateException("failed to encode test TDigest", e); } - return returnValue; + return digestHolder; } public static Block asBlock(BlockFactory blockFactory, ElementType elementType, List values) { diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/NullInsertingSourceOperator.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/NullInsertingSourceOperator.java index a07db77a1876f..3fac11bfe15ce 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/NullInsertingSourceOperator.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/NullInsertingSourceOperator.java @@ -23,6 +23,7 @@ import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.data.TDigestBlock; import org.elasticsearch.compute.data.TDigestBlockBuilder; +import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.compute.operator.SourceOperator; import java.util.Arrays; @@ -120,7 +121,7 @@ private void copyValue(Block from, int valueIndex, Block.Builder into) { ); break; case TDIGEST: - ((TDigestBlockBuilder) into).appendTDigest(((TDigestBlock) from).getTDigestHolder(valueIndex)); + ((TDigestBlockBuilder) into).appendTDigest(((TDigestBlock) from).getTDigestHolder(valueIndex, new TDigestHolder())); break; default: throw new IllegalArgumentException("unknown block type " + elementType); diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TDigestTestUtils.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TDigestTestUtils.java index 45317a23cba9a..2fa6efd96c4b3 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TDigestTestUtils.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TDigestTestUtils.java @@ -7,15 +7,21 @@ package org.elasticsearch.compute.test; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.compute.aggregation.TDigestStates; import org.elasticsearch.compute.data.TDigestHolder; -import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays; import org.elasticsearch.tdigest.Centroid; +import org.elasticsearch.tdigest.TDigest; import java.util.Collection; public class TDigestTestUtils { + private static final CircuitBreaker NOOP_BREAKER = new NoopCircuitBreaker("test-breaker"); + private static final MemoryTrackingTDigestArrays NOOP_ARRAYS = new MemoryTrackingTDigestArrays(NOOP_BREAKER); + /** * Utility method for verifying that a TDigestHolder is a correct merge of a collection of TDigestHolders. * TDigest is non-deterministic, we just do a sanity check here: @@ -29,22 +35,22 @@ public static boolean isMergedFrom(TDigestHolder merged, Collection 0) { - if (Math.abs(decoded.quantile(0.01) - reference.quantile(0.01)) > 0.1) { + if (Math.abs(mergedAsTDigest.quantile(0.01) - reference.quantile(0.01)) > 0.1) { return false; } - if (Math.abs(decoded.quantile(0.99) - reference.quantile(0.99)) > 0.1) { + if (Math.abs(mergedAsTDigest.quantile(0.99) - reference.quantile(0.99)) > 0.1) { return false; } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java index 336804221d627..6b6bcc4180ed3 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java @@ -44,6 +44,7 @@ import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.analytics.mapper.EncodedTDigest; import org.elasticsearch.xpack.core.analytics.mapper.TDigestParser; import org.elasticsearch.xpack.esql.action.ResponseValueUtils; import org.elasticsearch.xpack.esql.core.type.DataType; @@ -784,7 +785,15 @@ private static TDigestHolder parseTDigest(@Nullable String json) { DocumentParsingException::new, XContentParserUtils::parsingException ); - return new TDigestHolder(parsed.centroids(), parsed.counts(), parsed.min(), parsed.max(), parsed.sum(), parsed.count()); + TDigestHolder tdigest = new TDigestHolder(); + tdigest.reset( + EncodedTDigest.encodeCentroids(parsed.centroids(), parsed.counts()), + parsed.min(), + parsed.max(), + parsed.sum(), + parsed.count() + ); + return tdigest; } catch (IOException e) { throw new IllegalArgumentException(e); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 9e6c5b251299d..5e07a2269668a 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -78,6 +78,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.analytics.mapper.EncodedTDigest; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import org.elasticsearch.xpack.esql.analysis.AnalyzerSettings; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; @@ -1187,13 +1188,8 @@ public static TDigestHolder randomTDigest() { double min = digest.getMin(); double max = digest.getMax(); - TDigestHolder returnValue = null; - try { - returnValue = new TDigestHolder(centroids, counts, min, max, sum, valueCount); - } catch (IOException e) { - // This is a test util, so we're just going to fail the test here - fail(e); - } + TDigestHolder returnValue = new TDigestHolder(); + returnValue.reset(EncodedTDigest.encodeCentroids(centroids, counts), min, max, sum, valueCount); return returnValue; } diff --git a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigestFromHistogramEvaluator.java b/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigestFromHistogramEvaluator.java index d44a3f117d5cd..5111bd6a4a1cd 100644 --- a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigestFromHistogramEvaluator.java +++ b/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigestFromHistogramEvaluator.java @@ -7,6 +7,7 @@ import java.lang.IllegalArgumentException; import java.lang.Override; import java.lang.String; +import java.util.function.Function; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.compute.data.Block; @@ -19,6 +20,7 @@ import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.core.Releasables; +import org.elasticsearch.xpack.core.analytics.mapper.EncodedTDigest; import org.elasticsearch.xpack.esql.core.tree.Source; /** @@ -30,10 +32,16 @@ public final class ToTDigestFromHistogramEvaluator extends AbstractConvertFuncti private final EvalOperator.ExpressionEvaluator in; + private final EncodedTDigest decoder; + + private final TDigestHolder scratch; + public ToTDigestFromHistogramEvaluator(Source source, EvalOperator.ExpressionEvaluator in, - DriverContext driverContext) { + EncodedTDigest decoder, TDigestHolder scratch, DriverContext driverContext) { super(driverContext, source); this.in = in; + this.decoder = decoder; + this.scratch = scratch; } @Override @@ -69,7 +77,7 @@ public Block evalVector(Vector v) { private TDigestHolder evalValue(BytesRefVector container, int index, BytesRef scratchPad) { BytesRef value = container.getBytesRef(index, scratchPad); - return ToTDigest.fromHistogram(value); + return ToTDigest.fromHistogram(value, this.decoder, this.scratch); } @Override @@ -109,7 +117,7 @@ public Block evalBlock(Block b) { private TDigestHolder evalValue(BytesRefBlock container, int index, BytesRef scratchPad) { BytesRef value = container.getBytesRef(index, scratchPad); - return ToTDigest.fromHistogram(value); + return ToTDigest.fromHistogram(value, this.decoder, this.scratch); } @Override @@ -134,14 +142,22 @@ public static class Factory implements EvalOperator.ExpressionEvaluator.Factory private final EvalOperator.ExpressionEvaluator.Factory in; - public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory in) { + private final Function decoder; + + private final Function scratch; + + public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory in, + Function decoder, + Function scratch) { this.source = source; this.in = in; + this.decoder = decoder; + this.scratch = scratch; } @Override public ToTDigestFromHistogramEvaluator get(DriverContext context) { - return new ToTDigestFromHistogramEvaluator(source, in.get(context), context); + return new ToTDigestFromHistogramEvaluator(source, in.get(context), decoder.apply(context), scratch.apply(context), context); } @Override diff --git a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentileTDigestEvaluator.java b/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentileTDigestEvaluator.java index 64c41067f23ef..0785419a1d416 100644 --- a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentileTDigestEvaluator.java +++ b/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentileTDigestEvaluator.java @@ -10,7 +10,6 @@ import java.lang.String; import java.util.function.Function; import org.apache.lucene.util.RamUsageEstimator; -import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.Page; @@ -20,6 +19,7 @@ import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.Warnings; import org.elasticsearch.core.Releasables; +import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays; import org.elasticsearch.xpack.esql.core.tree.Source; /** @@ -35,19 +35,19 @@ public final class HistogramPercentileTDigestEvaluator implements EvalOperator.E private final EvalOperator.ExpressionEvaluator percentile; - private final CircuitBreaker breaker; + private final MemoryTrackingTDigestArrays tdigestArrays; private final DriverContext driverContext; private Warnings warnings; public HistogramPercentileTDigestEvaluator(Source source, EvalOperator.ExpressionEvaluator value, - EvalOperator.ExpressionEvaluator percentile, CircuitBreaker breaker, + EvalOperator.ExpressionEvaluator percentile, MemoryTrackingTDigestArrays tdigestArrays, DriverContext driverContext) { this.source = source; this.value = value; this.percentile = percentile; - this.breaker = breaker; + this.tdigestArrays = tdigestArrays; this.driverContext = driverContext; } @@ -70,6 +70,7 @@ public long baseRamBytesUsed() { public DoubleBlock eval(int positionCount, TDigestBlock valueBlock, DoubleBlock percentileBlock) { try(DoubleBlock.Builder result = driverContext.blockFactory().newDoubleBlockBuilder(positionCount)) { + TDigestHolder valueScratch = new TDigestHolder(); position: for (int p = 0; p < positionCount; p++) { switch (valueBlock.getValueCount(p)) { case 0: @@ -93,10 +94,10 @@ public DoubleBlock eval(int positionCount, TDigestBlock valueBlock, DoubleBlock result.appendNull(); continue position; } - TDigestHolder value = valueBlock.getTDigestHolder(valueBlock.getFirstValueIndex(p)); + TDigestHolder value = valueBlock.getTDigestHolder(valueBlock.getFirstValueIndex(p), valueScratch); double percentile = percentileBlock.getDouble(percentileBlock.getFirstValueIndex(p)); try { - HistogramPercentile.process(result, value, percentile, this.breaker); + HistogramPercentile.process(result, value, percentile, this.tdigestArrays); } catch (ArithmeticException e) { warnings().registerException(e); result.appendNull(); @@ -108,7 +109,7 @@ public DoubleBlock eval(int positionCount, TDigestBlock valueBlock, DoubleBlock @Override public String toString() { - return "HistogramPercentileTDigestEvaluator[" + "value=" + value + ", percentile=" + percentile + ", breaker=" + breaker + "]"; + return "HistogramPercentileTDigestEvaluator[" + "value=" + value + ", percentile=" + percentile + ", tdigestArrays=" + tdigestArrays + "]"; } @Override @@ -130,25 +131,25 @@ static class Factory implements EvalOperator.ExpressionEvaluator.Factory { private final EvalOperator.ExpressionEvaluator.Factory percentile; - private final Function breaker; + private final Function tdigestArrays; public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory value, EvalOperator.ExpressionEvaluator.Factory percentile, - Function breaker) { + Function tdigestArrays) { this.source = source; this.value = value; this.percentile = percentile; - this.breaker = breaker; + this.tdigestArrays = tdigestArrays; } @Override public HistogramPercentileTDigestEvaluator get(DriverContext context) { - return new HistogramPercentileTDigestEvaluator(source, value.get(context), percentile.get(context), breaker.apply(context), context); + return new HistogramPercentileTDigestEvaluator(source, value.get(context), percentile.get(context), tdigestArrays.apply(context), context); } @Override public String toString() { - return "HistogramPercentileTDigestEvaluator[" + "value=" + value + ", percentile=" + percentile + ", breaker=" + breaker + "]"; + return "HistogramPercentileTDigestEvaluator[" + "value=" + value + ", percentile=" + percentile + ", tdigestArrays=" + tdigestArrays + "]"; } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java index 28dd5e2d9209e..df3eb991dddab 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java @@ -23,6 +23,7 @@ import org.elasticsearch.compute.data.LongRangeBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.data.TDigestBlock; +import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; @@ -181,7 +182,7 @@ private static BlockValueExtractor valueExtractorFor(DataType dataType, ZoneId z var to = ((LongRangeBlock) block).getToBlock().getLong(offset); return dateRangeToString(from, to); }; - case TDIGEST -> (block, offset, scratch) -> ((TDigestBlock) block).getTDigestHolder(offset); + case TDIGEST -> (block, offset, scratch) -> ((TDigestBlock) block).getTDigestHolder(offset, new TDigestHolder()); case HISTOGRAM -> (block, offset, scratch) -> EsqlDataTypeConverter.histogramToString( ((BytesRefBlock) block).getBytesRef(offset, scratch) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigest.java index 488ab6b7a6967..b03b7af0ac4c7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigest.java @@ -8,12 +8,13 @@ package org.elasticsearch.xpack.esql.expression.function.scalar.convert; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.io.stream.ByteArrayStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.compute.ann.ConvertEvaluator; +import org.elasticsearch.compute.ann.Fixed; import org.elasticsearch.compute.data.TDigestHolder; +import org.elasticsearch.xpack.core.analytics.mapper.EncodedTDigest; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -25,10 +26,11 @@ import org.elasticsearch.xpack.esql.expression.function.Param; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; +import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL; + public class ToTDigest extends AbstractConvertFunction { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( @@ -39,7 +41,10 @@ public class ToTDigest extends AbstractConvertFunction { private static final Map EVALUATORS = Map.ofEntries( Map.entry(DataType.TDIGEST, (source, field) -> field), - Map.entry(DataType.HISTOGRAM, ToTDigestFromHistogramEvaluator.Factory::new) + Map.entry( + DataType.HISTOGRAM, + (source, in) -> new ToTDigestFromHistogramEvaluator.Factory(source, in, dc -> new EncodedTDigest(), dc -> new TDigestHolder()) + ) ); @FunctionInfo( @@ -86,39 +91,36 @@ public String getWriteableName() { } @ConvertEvaluator(extraName = "FromHistogram", warnExceptions = { IllegalArgumentException.class }) - static TDigestHolder fromHistogram(BytesRef in) { + static TDigestHolder fromHistogram( + BytesRef in, + @Fixed(includeInToString = false, scope = THREAD_LOCAL) EncodedTDigest decoder, + @Fixed(includeInToString = false, scope = THREAD_LOCAL) TDigestHolder scratch + ) { if (in.length > ByteSizeUnit.MB.toBytes(2)) { throw new IllegalArgumentException("Histogram length is greater than 2MB"); } // even though the encoded format is the same, we need to decode here to compute the summary data - List centroids = new ArrayList<>(); - List counts = new ArrayList<>(); - ByteArrayStreamInput streamInput = new ByteArrayStreamInput(); - streamInput.reset(in.bytes, in.offset, in.length); + decoder.reset(in); double min = Double.MAX_VALUE; double max = Double.MIN_VALUE; double sum = 0; long totalCount = 0; - try { - while (streamInput.available() > 0) { - long count = streamInput.readVLong(); - double value = Double.longBitsToDouble(streamInput.readLong()); - min = Math.min(min, value); - max = Math.max(max, value); - sum += value * count; - totalCount += count; - centroids.add(value); - counts.add(count); - } - if (totalCount == 0) { - min = Double.NaN; - max = Double.NaN; - sum = Double.NaN; - } - return new TDigestHolder(centroids, counts, min, max, sum, totalCount); - } catch (IOException e) { - throw new IllegalArgumentException(e.getMessage()); + EncodedTDigest.CentroidIterator it = decoder.centroidIterator(); + while (it.next()) { + long count = it.currentCount(); + double value = it.currentMean(); + min = Math.min(min, value); + max = Math.max(max, value); + sum += value * count; + totalCount += count; + } + if (totalCount == 0) { + min = Double.NaN; + max = Double.NaN; + sum = Double.NaN; } + scratch.reset(in, min, max, sum, totalCount); + return scratch; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentile.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentile.java index 54fd56a764f8c..7a84a16ca899e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentile.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentile.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.expression.function.scalar.histogram; -import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -16,11 +15,11 @@ import org.elasticsearch.compute.ann.Fixed; import org.elasticsearch.compute.data.DoubleBlock; import org.elasticsearch.compute.data.TDigestHolder; -import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.exponentialhistogram.ExponentialHistogramQuantile; -import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays; +import org.elasticsearch.tdigest.TDigest; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; @@ -137,12 +136,12 @@ static void process( DoubleBlock.Builder resultBuilder, TDigestHolder value, double percentile, - @Fixed(scope = Fixed.Scope.THREAD_LOCAL) CircuitBreaker breaker + @Fixed(scope = Fixed.Scope.THREAD_LOCAL) MemoryTrackingTDigestArrays tdigestArrays ) { checkPercentileRange(percentile); // TODO: add a way of clearing a TDigestState, so that we can reuse it via @Fixed across calls - try (TDigestState scratch = TDigestState.createOfType(breaker, TDigestState.Type.MERGING, TDigestStates.COMPRESSION)) { - value.addTo(scratch); + try (TDigest scratch = TDigest.createMergingDigest(tdigestArrays, TDigestStates.COMPRESSION)) { + scratch.add(value); double result = scratch.quantile(percentile / 100.0); if (Double.isNaN(result)) { // can happen if the histogram is empty resultBuilder.appendNull(); @@ -173,7 +172,7 @@ public EvalOperator.ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvalua source(), fieldEvaluator, percentileEvaluator, - DriverContext::breaker + driverContext -> new MemoryTrackingTDigestArrays(driverContext.breaker()) ); default -> throw EsqlIllegalArgumentException.illegalDataType(valueType); }; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java index 72e1f8b845e99..5a4db7cfbf505 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java @@ -859,7 +859,7 @@ public static String exponentialHistogramBlockToString(ExponentialHistogramBlock } public static String tDigestBlockToString(TDigestBlock tDigestBlock, int index) { - TDigestHolder digest = tDigestBlock.getTDigestHolder(index); + TDigestHolder digest = tDigestBlock.getTDigestHolder(index, new TDigestHolder()); return tDigestToString(digest); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 30e6ad98ce5a0..f4f335a649ae3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -67,6 +67,7 @@ import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.analytics.mapper.EncodedTDigest; import org.elasticsearch.xpack.core.analytics.mapper.TDigestParser; import org.elasticsearch.xpack.esql.CsvTestUtils; import org.elasticsearch.xpack.esql.EsqlTestUtils; @@ -1540,18 +1541,24 @@ static Page valuesToPage(BlockFactory blockFactory, List columns throw new IllegalArgumentException("Expected START_OBJECT but found: " + parser.currentToken()); } parser.nextToken(); - TDigestHolder parsed = new TDigestHolder( - TDigestParser.parse( - "serialized_block", - parser, - (a, b) -> new UnsupportedOperationException("failed parsing tdigest"), - (x, y, z) -> new UnsupportedOperationException("failed parsing tdigest") - ) + TDigestParser.ParsedTDigest parsed = TDigestParser.parse( + "serialized_block", + parser, + (a, b) -> new UnsupportedOperationException("failed parsing tdigest"), + (x, y, z) -> new UnsupportedOperationException("failed parsing tdigest") ); if (parsed == null) { tDigestBlockBuilder.appendNull(); } else { - tDigestBlockBuilder.appendTDigest(parsed); + TDigestHolder tdigest = new TDigestHolder(); + tdigest.reset( + EncodedTDigest.encodeCentroids(parsed.centroids(), parsed.counts()), + parsed.min(), + parsed.max(), + parsed.sum(), + parsed.count() + ); + tDigestBlockBuilder.appendTDigest(tdigest); } } catch (UnsupportedOperationException | IOException e) { fail("Unable to parse TDigestBlockBuilder: " + e.getMessage()); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgTests.java index c9237a1a0a0f9..73f9f75b14881 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgTests.java @@ -129,10 +129,10 @@ private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier } case TDIGEST -> { var tDigest = (TDigestHolder) fieldData.get(0); - if (tDigest.getValueCount() == 0) { + if (tDigest.size() == 0) { yield null; } - yield tDigest.getSum() / tDigest.getValueCount(); + yield tDigest.getSum() / tDigest.size(); } default -> { double value = ((Number) fieldData.get(0)).doubleValue(); @@ -169,7 +169,7 @@ private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier } case TDIGEST -> { double sum = fieldData.stream().mapToDouble(v -> ((TDigestHolder) v).getSum()).sum(); - double count = fieldData.stream().mapToLong(v -> ((TDigestHolder) v).getValueCount()).sum(); + double count = fieldData.stream().mapToLong(v -> ((TDigestHolder) v).size()).sum(); if (count == 0) { yield null; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountTests.java index c8ffcfd676820..c1ab010ac7cc7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountTests.java @@ -135,7 +135,7 @@ static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier fieldSup } else if (fieldSupplier.type() == DataType.TDIGEST) { count = fieldData.stream().mapToLong(data -> { TDigestHolder tdigest = (TDigestHolder) data; - return tdigest.getValueCount(); + return tdigest.size(); }).sum(); } else if (fieldSupplier.type() == DataType.EXPONENTIAL_HISTOGRAM) { count = fieldData.stream().mapToLong(obj -> ((ExponentialHistogram) obj).valueCount()).sum(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MaxTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MaxTests.java index 4e6715d7d544f..278319e685286 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MaxTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MaxTests.java @@ -243,7 +243,7 @@ private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier expected = fieldTypedData.multiRowData() .stream() .map(obj -> (TDigestHolder) obj) - .filter(histo -> histo.getValueCount() > 0) // only non-empty histograms have an influence + .filter(histo -> histo.size() > 0) // only non-empty histograms have an influence .map(TDigestHolder::getMax) .max(Comparator.naturalOrder()) .orElse(null); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MinTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MinTests.java index ffd5036869930..2db8b8ffd816b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MinTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MinTests.java @@ -243,7 +243,7 @@ private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier expected = fieldTypedData.multiRowData() .stream() .map(obj -> (TDigestHolder) obj) - .filter(histo -> histo.getValueCount() > 0) // only non-empty histograms have an influence + .filter(histo -> histo.size() > 0) // only non-empty histograms have an influence .map(TDigestHolder::getMin) .min(Comparator.naturalOrder()) .orElse(null); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java index 58dc5db36dd8c..5b504ad5ad325 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileTests.java @@ -10,6 +10,7 @@ import com.carrotsearch.randomizedtesting.annotations.Name; import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.aggregation.TDigestStates; import org.elasticsearch.compute.data.TDigestHolder; @@ -18,7 +19,9 @@ import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker; import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger; import org.elasticsearch.exponentialhistogram.ExponentialHistogramQuantile; +import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays; import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.tdigest.TDigest; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; @@ -124,10 +127,16 @@ public static Double getExpectedPercentileForExponentialHistograms(List values, double percentile) { - TDigestState merged = TDigestState.createWithoutCircuitBreaking(TDigestStates.COMPRESSION); - values.stream().filter(Objects::nonNull).forEach(tDigestHolder -> tDigestHolder.addTo(merged)); - double result = merged.quantile(percentile / 100.0); - return Double.isNaN(result) ? null : result; + try ( + TDigest merged = TDigest.createMergingDigest( + new MemoryTrackingTDigestArrays(new NoopCircuitBreaker("test-breaker")), + TDigestStates.COMPRESSION + ) + ) { + values.stream().filter(Objects::nonNull).forEach(merged::add); + double result = merged.quantile(percentile / 100.0); + return Double.isNaN(result) ? null : result; + } } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java index 4197bc61fc7fd..1fc02b0d4efd5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java @@ -132,7 +132,7 @@ private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier case TDIGEST -> { var sums = data.stream() .map(obj -> (TDigestHolder) obj) - .filter(obj -> obj.getValueCount() > 0) + .filter(obj -> obj.size() > 0) .mapToDouble(TDigestHolder::getSum) .toArray(); yield sums.length == 0 ? null : Arrays.stream(sums).sum(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java index 932f644224d08..e535ebb2f4da4 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java @@ -143,7 +143,7 @@ private static TestCaseSupplier makeSupplier(TestCaseSupplier.TypedDataSupplier case TDIGEST -> { var sums = data.stream() .map(obj -> (TDigestHolder) obj) - .filter(obj -> obj.getValueCount() > 0) + .filter(obj -> obj.size() > 0) .mapToDouble(TDigestHolder::getSum) .toArray(); yield sums.length == 0 ? null : Arrays.stream(sums).sum(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigestTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigestTests.java index b73a28b4f9adb..daa01ac15e45c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigestTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToTDigestTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.ByteArrayStreamInput; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.compute.data.TDigestHolder; +import org.elasticsearch.xpack.core.analytics.mapper.EncodedTDigest; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; @@ -88,7 +89,9 @@ static TDigestHolder fromHistogram(BytesRef in) { max = Double.NaN; sum = Double.NaN; } - return new TDigestHolder(centroids, counts, min, max, sum, totalCount); + TDigestHolder tdigest = new TDigestHolder(); + tdigest.reset(EncodedTDigest.encodeCentroids(centroids, counts), min, max, sum, totalCount); + return tdigest; } catch (IOException e) { throw new IllegalArgumentException(e.getMessage()); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/ExtractHistogramComponentTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/ExtractHistogramComponentTests.java index a6f04a9c43dcd..7cee3b42ec9f3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/ExtractHistogramComponentTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/ExtractHistogramComponentTests.java @@ -105,8 +105,8 @@ yield switch (component) { double max = value.getMax(); yield Double.isNaN(max) ? null : max; } - case SUM -> value.getValueCount() > 0 ? value.getSum() : null; - case COUNT -> (double) value.getValueCount(); + case SUM -> value.size() > 0 ? value.getSum() : null; + case COUNT -> (double) value.size(); }; } default -> throw new IllegalStateException("Unexpected histogram type [" + histogram.type() + "]"); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentileTests.java index 9d60b99729c64..dd8223a6bef77 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/histogram/HistogramPercentileTests.java @@ -15,7 +15,8 @@ import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.exponentialhistogram.ExponentialHistogramQuantile; -import org.elasticsearch.search.aggregations.metrics.TDigestState; +import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays; +import org.elasticsearch.tdigest.TDigest; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; @@ -101,8 +102,10 @@ private static double getExpectedPercentile(Object histogramObj, double percVal) case ExponentialHistogram expHisto -> ExponentialHistogramQuantile.getQuantile(expHisto, percVal / 100.0); case TDigestHolder tdigest -> { NoopCircuitBreaker noopBreaker = new NoopCircuitBreaker("noop-breaker"); - try (TDigestState scratch = TDigestState.createOfType(noopBreaker, TDigestState.Type.MERGING, TDigestStates.COMPRESSION)) { - tdigest.addTo(scratch); + try ( + TDigest scratch = TDigest.createMergingDigest(new MemoryTrackingTDigestArrays(noopBreaker), TDigestStates.COMPRESSION) + ) { + scratch.add(tdigest); yield scratch.quantile(percVal / 100.0); } } @@ -118,7 +121,7 @@ private static Matcher getEvaluatorToStringMatcher(DataType histoType, D "HistogramPercentileExponentialHistogramEvaluator[value=Attribute[channel=0], percentile=" + percentileEvaluatorName + "]" ); case TDIGEST -> startsWith( - "HistogramPercentileTDigestEvaluator[value=Attribute[channel=0], percentile=" + percentileEvaluatorName + ", breaker=" + "HistogramPercentileTDigestEvaluator[value=Attribute[channel=0], percentile=" + percentileEvaluatorName + ", tdigestArrays=" ); default -> throw new IllegalStateException("Not a histogram type: " + histoType); };