Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
package org.elasticsearch.xpack.core.analytics.mapper;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tdigest.Centroid;
import org.elasticsearch.tdigest.TDigestReadView;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

/**
Expand All @@ -30,6 +33,12 @@
*/
public final class EncodedTDigest implements TDigestReadView {

/**
* The size of a single EncodedTDigest instance in bytes, excluding the underlying encoded digest bytes array.
*/
public static final long RAM_BYTES = RamUsageEstimator.shallowSizeOfInstance(EncodedTDigest.class) + RamUsageEstimator
.shallowSizeOfInstance(BytesRef.class);

private final BytesRef encodedDigest = new BytesRef();
private long cachedSize = -1L;
private double cachedMax = Double.NaN;
Expand Down Expand Up @@ -73,63 +82,85 @@ public CentroidIterator centroidIterator() {
/**
* Encodes the provided centroids into a {@link BytesRef}.
*/
public static BytesRef encodeCentroids(List<? extends Centroid> centroids) {
return encodeCentroidsFromIterator(new CentroidIterator() {
private int index = -1;
public static BytesRef encodeCentroids(Collection<? extends Centroid> centroids) {
try (BytesStreamOutput out = new BytesStreamOutput()) {
encodeCentroids(centroids, out);
return out.bytes().toBytesRef();
} catch (IOException e) {
throw new IllegalStateException("Failed to encode centroid", e);
}
}

public static void encodeCentroids(Collection<? extends Centroid> centroids, StreamOutput out) throws IOException {
encodeCentroidsFromIterator(new CentroidIterator() {
private final Iterator<? extends Centroid> iterator = centroids.iterator();
Centroid centroid;

@Override
public boolean next() {
index++;
return index < centroids.size();
if (iterator.hasNext() == false) {
return false;
}
centroid = iterator.next();
return true;
}

@Override
public long currentCount() {
return centroids.get(index).count();
assert centroid != null : "next() must be called and return true before accessing current centroid";
return centroid.count();
}

@Override
public double currentMean() {
return centroids.get(index).mean();
assert centroid != null : "next() must be called and return true before accessing current centroid";
return centroid.mean();
}

@Override
public boolean hasNext() {
return index + 1 < centroids.size();
return iterator.hasNext();
}

});
}, out);
}

/**
* Encodes centroids represented by independent means and counts lists.
*/
public static BytesRef encodeCentroids(List<Double> means, List<Long> counts) {
assert means.size() == counts.size() : "centroids and counts must have equal size";
return encodeCentroidsFromIterator(new CentroidIterator() {
private int index = -1;
try (BytesStreamOutput out = new BytesStreamOutput()) {
encodeCentroidsFromIterator(new CentroidIterator() {
private int index = -1;

@Override
public boolean next() {
index++;
return index < means.size();
}
@Override
public boolean next() {
index++;
return index < means.size();
}

@Override
public long currentCount() {
return counts.get(index);
}
@Override
public long currentCount() {
assert index >= 0 : "next() must be called and return true before accessing current centroid";
return counts.get(index);
}

@Override
public double currentMean() {
return means.get(index);
}
@Override
public double currentMean() {
assert index >= 0 : "next() must be called and return true before accessing current centroid";
return means.get(index);
}

@Override
public boolean hasNext() {
return index + 1 < means.size();
}
});
@Override
public boolean hasNext() {
return index + 1 < means.size();
}
}, out);
return out.bytes().toBytesRef();
} catch (IOException e) {
throw new IllegalStateException("Failed to encode centroid", e);
}
}

@Override
Expand Down Expand Up @@ -172,21 +203,16 @@ public Collection<Centroid> centroids() {
return Collections.unmodifiableList(decoded);
}

private static BytesRef encodeCentroidsFromIterator(CentroidIterator centroids) {
try (BytesStreamOutput out = new BytesStreamOutput()) {
while (centroids.next()) {
long count = centroids.currentCount();
if (count < 0) {
throw new IllegalArgumentException("Centroid count cannot be negative: " + count);
}
if (count > 0) {
out.writeVLong(count);
out.writeDouble(centroids.currentMean());
}
private static void encodeCentroidsFromIterator(CentroidIterator centroids, StreamOutput out) throws IOException {
while (centroids.next()) {
long count = centroids.currentCount();
if (count < 0) {
throw new IllegalArgumentException("Centroid count cannot be negative: " + count);
}
if (count > 0) {
out.writeVLong(count);
out.writeDouble(centroids.currentMean());
}
return out.bytes().toBytesRef();
} catch (IOException e) {
throw new IllegalStateException("Failed to encode centroid", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public static TypeDef of(TypeName type, String alias, String block, String vecto
TypeDef.of(TypeName.DOUBLE, "DOUBLE", "DoubleBlock", "DoubleVector", null),
TypeDef.of(BYTES_REF, "BYTES_REF", "BytesRefBlock", "BytesRefVector", BYTES_REF),
TypeDef.of(EXPONENTIAL_HISTOGRAM, "EXPONENTIAL_HISTOGRAM", "ExponentialHistogramBlock", null, EXPONENTIAL_HISTOGRAM_SCRATCH),
TypeDef.of(TDIGEST, "TDIGEST", "TDigestBlock", null, null)
TypeDef.of(TDIGEST, "TDIGEST", "TDigestBlock", null, TDIGEST)
)
.flatMap(def -> Stream.of(def.type.toString(), def.type + "[]", def.alias).map(alias -> Map.entry(alias, def)))
.collect(toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading