diff --git a/NOTICES b/NOTICES new file mode 100644 index 0000000000000..9c15b9d6155f4 --- /dev/null +++ b/NOTICES @@ -0,0 +1,3 @@ +The code for the t-digest was originally authored by Ted Dunning + +Adrien Grand contributed the heart of the AVLTreeDigest (https://github.com/jpountz) diff --git a/presto-main/src/main/java/com/facebook/presto/tdigest/Centroid.java b/presto-main/src/main/java/com/facebook/presto/tdigest/Centroid.java new file mode 100644 index 0000000000000..9f92277feb36a --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/tdigest/Centroid.java @@ -0,0 +1,137 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.tdigest; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Objects.requireNonNull; + +/** + * A single centroid which represents a number of data points. + */ +@NotThreadSafe +public class Centroid + implements Comparable, Serializable +{ + private static final AtomicInteger uniqueCount = new AtomicInteger(1); + + private double centroid; + private int count; + + // The ID is transient because it must be unique within a given JVM. A new + // ID should be generated from uniqueCount when a Centroid is deserialized. + private transient int id; + + public Centroid(double x) + { + start(x, 1, uniqueCount.getAndIncrement()); + } + + public Centroid(double x, int w) + { + start(x, w, uniqueCount.getAndIncrement()); + } + + public Centroid(double x, int w, int id) + { + start(x, w, id); + } + + private void start(double x, int w, int id) + { + this.id = id; + add(x, w); + } + + public void add(double x, int w) + { + count += w; + centroid += w * (x - centroid) / count; + } + + public double getMean() + { + return centroid; + } + + public int getWeight() + { + return count; + } + + public int getId() + { + return id; + } + + @Override + public String toString() + { + return "Centroid{" + + "mean=" + centroid + + ", count=" + count + + '}'; + } + + @Override + public int hashCode() + { + return id; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Centroid centroid = (Centroid) o; + return this.centroid == centroid.getMean() && this.count == centroid.getWeight(); + } + + @Override + public int compareTo(Centroid o) + { + requireNonNull(o); + int r = Double.compare(centroid, o.centroid); + if (r == 0) { + r = id - o.id; + } + return r; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/tdigest/TDigest.java b/presto-main/src/main/java/com/facebook/presto/tdigest/TDigest.java new file mode 100644 index 0000000000000..ce0ca747f2fa7 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/tdigest/TDigest.java @@ -0,0 +1,653 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.tdigest; + +import io.airlift.slice.BasicSliceInput; +import io.airlift.slice.DynamicSliceOutput; +import io.airlift.slice.Slice; +import io.airlift.slice.SliceInput; +import io.airlift.slice.SliceOutput; +import org.openjdk.jol.info.ClassLayout; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +import static com.facebook.presto.tdigest.TDigestUtils.maxSize; +import static com.facebook.presto.tdigest.TDigestUtils.normalizer; +import static com.facebook.presto.tdigest.TDigestUtils.reverse; +import static com.facebook.presto.tdigest.TDigestUtils.sort; +import static com.facebook.presto.tdigest.TDigestUtils.weightedAverage; +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_BYTE; +import static io.airlift.slice.SizeOf.SIZE_OF_DOUBLE; +import static io.airlift.slice.SizeOf.SIZE_OF_INT; +import static io.airlift.slice.SizeOf.sizeOf; +import static io.airlift.slice.Slices.wrappedDoubleArray; +import static java.lang.Math.ceil; +import static java.lang.Math.max; +import static java.lang.Math.toIntExact; +import static java.lang.String.format; +import static java.lang.System.arraycopy; +import static java.util.Collections.shuffle; + +/** + * Maintains a t-digest by collecting new points in a buffer that is then sorted occasionally and merged + * into a sorted array that contains previously computed centroids. + *

+ * This can be very fast because the cost of sorting and merging is amortized over several insertion. If + * we keep N centroids total and have the input array of size k, then the amortized cost is approximately + *

+ * N/k + log k + *

+ * These costs even out when N/k = log k. Balancing costs is often a good place to start in optimizing an + * algorithm. + * The virtues of this kind of t-digest implementation include: + *

+ *

+ */ + +@NotThreadSafe +public class TDigest +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TDigest.class).instanceSize(); + private static final double MAX_COMPRESSION_FACTOR = 1_000; + private static final double sizeFudge = 30; + + private double min = Double.POSITIVE_INFINITY; + private double max = Double.NEGATIVE_INFINITY; + + private final Random gen = ThreadLocalRandom.current(); + + private int mergeCount; + + private final double publicCompression; + private final double compression; + + // points to the first unused centroid + private int activeCentroids; + + private double totalWeight; + + private final double[] weight; + + private final double[] mean; + + private double unmergedWeight; + + // this is the index of the next temporary centroid + // this is a more Java-like convention than activeCentroids uses + private int tempUsed; + private final double[] tempWeight; + private final double[] tempMean; + + // array used for sorting the temp centroids + // to avoid allocations during operation + private final int[] order; + + private TDigest(double compression) + { + // ensure compression >= 10 + // default size = 2 * ceil(compression) + // default bufferSize = 5 * size + // ensure size > 2 * compression + weightLimitFudge + // ensure bufferSize > 2 * size + + checkArgument(compression <= MAX_COMPRESSION_FACTOR, "Compression factor cannot exceed %s", MAX_COMPRESSION_FACTOR); + this.publicCompression = max(compression, 10); + // publicCompression is how many centroids the user asked for + // compression is how many we actually keep + this.compression = 2 * publicCompression; + + // having a big buffer is good for speed + int bufferSize = 5 * (int) ceil(this.publicCompression + sizeFudge); + int size = (int) ceil(this.compression + sizeFudge); + + weight = new double[size]; + mean = new double[size]; + tempWeight = new double[bufferSize]; + tempMean = new double[bufferSize]; + order = new int[bufferSize]; + + activeCentroids = 0; + } + + public static TDigest createTDigest(double compression) + { + return new TDigest(compression); + } + + public static TDigest createTDigest(Slice slice) + { + if (slice == null) { + return null; + } + + SliceInput sliceInput = new BasicSliceInput(slice); + try { + byte format = sliceInput.readByte(); + checkArgument(format == 0, "Invalid serialization format for TDigest; expected '0'"); + byte type = sliceInput.readByte(); + checkArgument(type == 0, "Invalid type for TDigest; expected '0' (type double)"); + double min = sliceInput.readDouble(); + double max = sliceInput.readDouble(); + double publicCompression = max(10, sliceInput.readDouble()); + TDigest r = new TDigest(publicCompression); + r.setMinMax(min, max); + r.totalWeight = sliceInput.readDouble(); + r.activeCentroids = sliceInput.readInt(); + sliceInput.readBytes(wrappedDoubleArray(r.weight), r.activeCentroids * SIZE_OF_DOUBLE); + sliceInput.readBytes(wrappedDoubleArray(r.mean), r.activeCentroids * SIZE_OF_DOUBLE); + sliceInput.close(); + return r; + } + catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException("Incorrect slice serialization format"); + } + } + + public void add(double x) + { + add(x, 1); + } + + public void add(double x, long w) + { + checkArgument(!Double.isNaN(x), "Cannot add NaN to t-digest"); + checkArgument(w > 0L, "weight must be > 0"); + + if (tempUsed >= tempWeight.length - activeCentroids - 1) { + mergeNewValues(); + } + int where = tempUsed++; + tempWeight[where] = w; + tempMean[where] = x; + unmergedWeight += w; + if (x < min) { + min = x; + } + if (x > max) { + max = x; + } + } + + public void merge(TDigest other) + { + checkArgument(other != null, "Cannot merge with a null t-digest"); + checkArgument(this.publicCompression == other.getCompressionFactor(), "TDigests must have the same compression, found (%s, %s)", this.publicCompression, + other.getCompressionFactor()); + List tmp = new ArrayList<>(); + for (Centroid centroid : other.centroids()) { + tmp.add(centroid); + } + + shuffle(tmp, gen); + for (Centroid centroid : tmp) { + add(centroid.getMean(), centroid.getWeight()); + } + } + + private void mergeNewValues() + { + mergeNewValues(false, compression); + } + + private void mergeNewValues(boolean force, double compression) + { + if (unmergedWeight == 0) { + return; + } + + if (force || unmergedWeight > 0) { + // note that we run the merge in reverse every other merge to avoid left-to-right bias in merging + merge(tempMean, tempWeight, tempUsed, order, unmergedWeight, mergeCount % 2 == 1, compression); + mergeCount++; + tempUsed = 0; + unmergedWeight = 0; + } + } + + private void merge(double[] incomingMean, + double[] incomingWeight, + int incomingCount, + int[] incomingOrder, + double unmergedWeight, + boolean runBackwards, + double compression) + { + arraycopy(mean, 0, incomingMean, incomingCount, activeCentroids); + arraycopy(weight, 0, incomingWeight, incomingCount, activeCentroids); + incomingCount += activeCentroids; + + if (incomingOrder == null) { + incomingOrder = new int[incomingCount]; + } + sort(incomingOrder, incomingMean, incomingCount); + + if (runBackwards) { + reverse(incomingOrder, 0, incomingCount); + } + + totalWeight += unmergedWeight; + + checkArgument((activeCentroids + incomingCount) > 0, "Active centroids plus incoming count must be > 0, was %s", activeCentroids + incomingCount); + activeCentroids = 0; + mean[activeCentroids] = incomingMean[incomingOrder[0]]; + weight[activeCentroids] = incomingWeight[incomingOrder[0]]; + double weightSoFar = 0; + + double normalizer = normalizer(compression, totalWeight); + for (int i = 1; i < incomingCount; i++) { + int ix = incomingOrder[i]; + double proposedWeight = weight[activeCentroids] + incomingWeight[ix]; + boolean addThis; + + double q0 = weightSoFar / totalWeight; + double q2 = (weightSoFar + proposedWeight) / totalWeight; + addThis = proposedWeight <= totalWeight * Math.min(maxSize(q0, normalizer), maxSize(q2, normalizer)); + + if (addThis) { + // next point can be merged into existing centroid + weight[activeCentroids] += incomingWeight[ix]; + mean[activeCentroids] = mean[activeCentroids] + (incomingMean[ix] - mean[activeCentroids]) * incomingWeight[ix] / weight[activeCentroids]; + incomingWeight[ix] = 0; + } + else { + // move to next output, copy out first centroid + weightSoFar += weight[activeCentroids]; + + activeCentroids++; + mean[activeCentroids] = incomingMean[ix]; + weight[activeCentroids] = incomingWeight[ix]; + incomingWeight[ix] = 0; + } + } + activeCentroids++; + + // sanity check + double sum = 0; + for (int i = 0; i < activeCentroids; i++) { + sum += weight[i]; + } + + checkArgument(sum == totalWeight, "Sum must equal the total weight, but sum:%s != totalWeight:%s", sum, totalWeight); + if (runBackwards) { + reverse(mean, 0, activeCentroids); + reverse(weight, 0, activeCentroids); + } + + if (totalWeight > 0) { + min = Math.min(min, mean[0]); + max = max(max, mean[activeCentroids - 1]); + } + } + + /** + * Merges any pending inputs and compresses the data down to the public setting. + */ + public void compress() + { + mergeNewValues(true, publicCompression); + } + + public double getSize() + { + return totalWeight + unmergedWeight; + } + + public double getCdf(double x) + { + if (unmergedWeight > 0) { + compress(); + } + + if (activeCentroids == 0) { + return Double.NaN; + } + if (activeCentroids == 1) { + double width = max - min; + if (x < min) { + return 0; + } + if (x > max) { + return 1; + } + if (x - min <= width) { + // min and max are too close together to do any viable interpolation + return 0.5; + } + return (x - min) / (max - min); + } + int n = activeCentroids; + if (x < min) { + return 0; + } + + if (x > max) { + return 1; + } + + // check for the left tail + if (x < mean[0]) { + // guarantees we divide by non-zero number and interpolation works + if (mean[0] - min > 0) { + // must be a sample exactly at min + if (x == min) { + return 0.5 / totalWeight; + } + return (1 + (x - min) / (mean[0] - min) * (weight[0] / 2 - 1)) / totalWeight; + } + return 0; + } + checkArgument(x >= mean[0], "Value x:%s must be greater than mean of first centroid %s if we got here", x, mean[0]); + + // and the right tail + if (x > mean[n - 1]) { + if (max - mean[n - 1] > 0) { + if (x == max) { + return 1 - 0.5 / totalWeight; + } + // there has to be a single sample exactly at max + double dq = (1 + (max - x) / (max - mean[n - 1]) * (weight[n - 1] / 2 - 1)) / totalWeight; + return 1 - dq; + } + return 1; + } + + // we know that there are at least two centroids and mean[0] < x < mean[n-1] + // that means that there are either one or more consecutive centroids all at exactly x + // or there are consecutive centroids, c0 < x < c1 + double weightSoFar = 0; + for (int it = 0; it < n - 1; it++) { + // weightSoFar does not include weight[it] yet + if (mean[it] == x) { + // dw will accumulate the weight of all of the centroids at x + double dw = 0; + while (it < n && mean[it] == x) { + dw += weight[it]; + it++; + } + return (weightSoFar + dw / 2) / totalWeight; + } + else if (mean[it] <= x && x < mean[it + 1]) { + // landed between centroids + if (mean[it + 1] - mean[it] > 0) { + // no interpolation needed if we have a singleton centroid + double leftExcludedW = 0; + double rightExcludedW = 0; + if (weight[it] == 1) { + if (weight[it + 1] == 1) { + // two singletons means no interpolation + // left singleton is in, right is out + return (weightSoFar + 1) / totalWeight; + } + else { + leftExcludedW = 0.5; + } + } + else if (weight[it + 1] == 1) { + rightExcludedW = 0.5; + } + double dw = (weight[it] + weight[it + 1]) / 2; + + checkArgument(dw > 1, "dw must be > 1, was %s", dw); + checkArgument((leftExcludedW + rightExcludedW) <= 0.5, "Excluded weight must be <= 0.5, was %s", leftExcludedW + rightExcludedW); + + // adjust endpoints for any singleton + double left = mean[it]; + double right = mean[it + 1]; + + double dwNoSingleton = dw - leftExcludedW - rightExcludedW; + + checkArgument(right - left > 0, "Centroids should be in ascending order, but mean of left centroid was greater than right centroid"); + + double base = weightSoFar + weight[it] / 2 + leftExcludedW; + return (base + dwNoSingleton * (x - left) / (right - left)) / totalWeight; + } + else { + // caution against floating point madness + double dw = (weight[it] + weight[it + 1]) / 2; + return (weightSoFar + dw) / totalWeight; + } + } + else { + weightSoFar += weight[it]; + } + } + checkArgument(x == mean[n - 1], "At this point, x must equal the mean of the last centroid"); + + return 1 - 0.5 / totalWeight; + } + + public double getQuantile(double q) + { + checkArgument(q >= 0 && q <= 1, "q should be in [0,1], got %s", q); + if (unmergedWeight > 0) { + compress(); + } + + if (activeCentroids == 0) { + return Double.NaN; + } + else if (activeCentroids == 1) { + return mean[0]; + } + + int n = activeCentroids; + + final double index = q * totalWeight; + + if (index < 1) { + return min; + } + + // if the left centroid has more than one sample, we still know + // that one sample occurred at min so we can do some interpolation + if (weight[0] > 1 && index < weight[0] / 2) { + // there is a single sample at min so we interpolate with less weight + return min + (index - 1) / (weight[0] / 2 - 1) * (mean[0] - min); + } + + if (index > totalWeight - 1) { + return max; + } + + // if the right-most centroid has more than one sample, we still know + // that one sample occurred at max so we can do some interpolation + if (weight[n - 1] > 1 && totalWeight - index <= weight[n - 1] / 2) { + return max - (totalWeight - index - 1) / (weight[n - 1] / 2 - 1) * (max - mean[n - 1]); + } + + // in between extremes we interpolate between centroids + double weightSoFar = weight[0] / 2; + for (int i = 0; i < n - 1; i++) { + // centroids i and i + 1 bracket our current point + double dw = (weight[i] + weight[i + 1]) / 2; + if (weightSoFar + dw > index) { + // check for unit weight + double leftUnit = 0; + if (weight[i] == 1) { + if (index - weightSoFar < 0.5) { + // within the singleton's sphere + return mean[i]; + } + else { + leftUnit = 0.5; + } + } + double rightUnit = 0; + if (weight[i + 1] == 1) { + if (weightSoFar + dw - index <= 0.5) { + // no interpolation needed near singleton + return mean[i + 1]; + } + rightUnit = 0.5; + } + double z1 = index - weightSoFar - leftUnit; + double z2 = weightSoFar + dw - index - rightUnit; + return weightedAverage(mean[i], z2, mean[i + 1], z1); + } + weightSoFar += dw; + } + + checkArgument(weight[n - 1] > 1, "Expected weight[n - 1] > 1, but was %s", weight[n - 1]); + checkArgument(index <= totalWeight, "Expected index <= totalWeight, but index:%s > totalWeight:%s", index, totalWeight); + checkArgument(index >= totalWeight - weight[n - 1] / 2, "Expected index >= totalWeight - weight[n - 1] / 2, but" + + "index:%s < %s", index, totalWeight - weight[n - 1] / 2); + + // weightSoFar = totalWeight - weight[n - 1] / 2 (very nearly) + // so we interpolate out to max value ever seen + double z1 = index - totalWeight - weight[n - 1] / 2.0; + double z2 = weight[n - 1] / 2 - z1; + return weightedAverage(mean[n - 1], z1, max, z2); + } + + public int centroidCount() + { + mergeNewValues(); + return activeCentroids; + } + + public Collection centroids() + { + compress(); + return new AbstractCollection() + { + @Override + public Iterator iterator() + { + return new Iterator() + { + int i; + + @Override + public boolean hasNext() + { + return i < activeCentroids; + } + + @Override + public Centroid next() + { + Centroid rc = new Centroid(mean[i], (int) weight[i]); + i++; + return rc; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("Default operation"); + } + }; + } + + @Override + public int size() + { + return activeCentroids; + } + }; + } + + public double getCompressionFactor() + { + return publicCompression; + } + + public long estimatedSerializedSizeInBytes() + { + compress(); + return SIZE_OF_BYTE // format + + SIZE_OF_BYTE // type (e.g double, float, bigint) + + SIZE_OF_DOUBLE // min + + SIZE_OF_DOUBLE // max + + SIZE_OF_DOUBLE // compression factor + + SIZE_OF_DOUBLE // total weight + + SIZE_OF_INT // number of centroids + + SIZE_OF_DOUBLE * activeCentroids // weight[], containing weight of each centroid + + SIZE_OF_DOUBLE * activeCentroids; // mean[], containing mean of each centroid + } + + public long estimatedInMemorySizeInBytes() + { + return INSTANCE_SIZE + sizeOf(weight) + sizeOf(mean) + sizeOf(tempWeight) + sizeOf(tempMean) + sizeOf(order); + } + + public Slice serialize() + { + SliceOutput sliceOutput = new DynamicSliceOutput(toIntExact(estimatedSerializedSizeInBytes())); + + sliceOutput.writeByte(0); // version 0 of T-Digest serialization + sliceOutput.writeByte(0); // represents the underlying data type of the distribution + sliceOutput.writeDouble(min); + sliceOutput.writeDouble(max); + sliceOutput.writeDouble(publicCompression); + sliceOutput.writeDouble(totalWeight); + sliceOutput.writeInt(activeCentroids); + sliceOutput.writeBytes(wrappedDoubleArray(weight), 0, activeCentroids * SIZE_OF_DOUBLE); + sliceOutput.writeBytes(wrappedDoubleArray(mean), 0, activeCentroids * SIZE_OF_DOUBLE); + return sliceOutput.slice(); + } + + private void setMinMax(double min, double max) + { + this.min = min; + this.max = max; + } + + public double getMin() + { + return min; + } + + public double getMax() + { + return max; + } + + public String toString() + { + return format("TDigest\nCompression:%s\nCentroid Count:%s\nSize:%s\nMin:%s Median:%s Max:%s", + publicCompression, activeCentroids, totalWeight, min, getQuantile(0.5), max); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/tdigest/TDigestUtils.java b/presto-main/src/main/java/com/facebook/presto/tdigest/TDigestUtils.java new file mode 100644 index 0000000000000..7a816cca68382 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/tdigest/TDigestUtils.java @@ -0,0 +1,553 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.tdigest; + +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +import static java.lang.String.format; + +public final class TDigestUtils +{ + private TDigestUtils() {} + + static double weightedAverage(double x1, double w1, double x2, double w2) + { + if (x1 <= x2) { + return weightedAverageSorted(x1, w1, x2, w2); + } + else { + return weightedAverageSorted(x2, w2, x1, w1); + } + } + + private static double weightedAverageSorted(double x1, double w1, double x2, double w2) + { + final double x = (x1 * w1 + x2 * w2) / (w1 + w2); + return Math.max(x1, Math.min(x, x2)); + } + + // Scale Functions + public static double maxSize(double q, double compression, double n) + { + return Z(compression, n) * q * (1 - q) / compression; + } + + public static double maxSize(double q, double normalizer) + { + return q * (1 - q) / normalizer; + } + + public static double normalizer(double compression, double n) + { + return compression / Z(compression, n); + } + + private static double Z(double compression, double n) + { + return 4 * Math.log(n / compression) + 24; + } + + // Sorting Functions + private static final Random prng = ThreadLocalRandom.current(); // for choosing pivots during quicksort + + /** + * Quick sort using an index array. On return, + * values[order[i]] is in order as i goes 0..values.length + * + * @param order Indexes into values + * @param values The values to sort. + */ + public static void sort(int[] order, double[] values) + { + sort(order, values, 0, values.length); + } + + /** + * Quick sort using an index array. On return, + * values[order[i]] is in order as i goes 0..n + * + * @param order Indexes into values + * @param values The values to sort. + * @param n The number of values to sort + */ + public static void sort(int[] order, double[] values, int n) + { + sort(order, values, 0, n); + } + + /** + * Quick sort using an index array. On return, + * values[order[i]] is in order as i goes start..n + * + * @param order Indexes into values + * @param values The values to sort. + * @param start The first element to sort + * @param n The number of values to sort + */ + public static void sort(int[] order, double[] values, int start, int n) + { + for (int i = start; i < start + n; i++) { + order[i] = i; + } + quickSort(order, values, start, start + n, 64); + insertionSort(order, values, start, start + n, 64); + } + + /** + * Standard quick sort except that sorting is done on an index array rather than the values themselves + * + * @param order The pre-allocated index array + * @param values The values to sort + * @param start The beginning of the values to sort + * @param end The value after the last value to sort + * @param limit The minimum size to recurse down to. + */ + private static void quickSort(int[] order, double[] values, int start, int end, int limit) + { + // the while loop implements tail-recursion to avoid excessive stack calls on nasty cases + while (end - start > limit) { + // pivot by a random element + int pivotIndex = start + prng.nextInt(end - start); + double pivotValue = values[order[pivotIndex]]; + + // move pivot to beginning of array + swap(order, start, pivotIndex); + + // we use a three way partition because many duplicate values is an important case + + int low = start + 1; // low points to first value not known to be equal to pivotValue + int high = end; // high points to first value > pivotValue + int i = low; // i scans the array + while (i < high) { + // invariant: values[order[k]] == pivotValue for k in [0..low) + // invariant: values[order[k]] < pivotValue for k in [low..i) + // invariant: values[order[k]] > pivotValue for k in [high..end) + // in-loop: i < high + // in-loop: low < high + // in-loop: i >= low + double vi = values[order[i]]; + if (vi == pivotValue) { + if (low != i) { + swap(order, low, i); + } + else { + i++; + } + low++; + } + else if (vi > pivotValue) { + high--; + swap(order, i, high); + } + else { + // vi < pivotValue + i++; + } + } + // invariant: values[order[k]] == pivotValue for k in [0..low) + // invariant: values[order[k]] < pivotValue for k in [low..i) + // invariant: values[order[k]] > pivotValue for k in [high..end) + // assert i == high || low == high therefore, we are done with partition + + // at this point, i==high, from [start,low) are == pivot, [low,high) are < and [high,end) are > + // we have to move the values equal to the pivot into the middle. To do this, we swap pivot + // values into the top end of the [low,high) range stopping when we run out of destinations + // or when we run out of values to copy + int from = start; + int to = high - 1; + for (i = 0; from < low && to >= low; i++) { + swap(order, from++, to--); + } + if (from == low) { + // ran out of things to copy. This means that the the last destination is the boundary + low = to + 1; + } + else { + // ran out of places to copy to. This means that there are uncopied pivots and the + // boundary is at the beginning of those + low = from; + } + + // now recurse, but arrange it so we handle the longer limit by tail recursion + if (low - start < end - high) { + quickSort(order, values, start, low, limit); + + // this is really a way to do + // quickSort(order, values, high, end, limit); + start = high; + } + else { + quickSort(order, values, high, end, limit); + // this is really a way to do + // quickSort(order, values, start, low, limit); + end = low; + } + } + } + + /** + * Quick sort in place of several paired arrays. On return, + * keys[...] is in order and the values[] arrays will be + * reordered as well in the same way. + * + * @param key Values to sort on + * @param values The auxilliary values to sort. + */ + public static void sort(double[] key, double[]... values) + { + sort(key, 0, key.length, values); + } + + /** + * Quick sort using an index array. On return, + * values[order[i]] is in order as i goes start..n + * @param key Values to sort on + * @param start The first element to sort + * @param n The number of values to sort + * @param values The auxilliary values to sort. + */ + public static void sort(double[] key, int start, int n, double[]... values) + { + quickSort(key, values, start, start + n, 8); + insertionSort(key, values, start, start + n, 8); + } + + /** + * Standard quick sort except that sorting rearranges parallel arrays + * + * @param key Values to sort on + * @param values The auxilliary values to sort. + * @param start The beginning of the values to sort + * @param end The value after the last value to sort + * @param limit The minimum size to recurse down to. + */ + private static void quickSort(double[] key, double[][] values, int start, int end, int limit) + { + // the while loop implements tail-recursion to avoid excessive stack calls on nasty cases + while (end - start > limit) { + // median of three values for the pivot + int a = start; + int b = (start + end) / 2; + int c = end - 1; + + int pivotIndex; + double pivotValue; + double va = key[a]; + double vb = key[b]; + double vc = key[c]; + //noinspection Duplicates + if (va > vb) { + if (vc > va) { + // vc > va > vb + pivotIndex = a; + pivotValue = va; + } + else { + // va > vb, va >= vc + if (vc < vb) { + // va > vb > vc + pivotIndex = b; + pivotValue = vb; + } + else { + // va >= vc >= vb + pivotIndex = c; + pivotValue = vc; + } + } + } + else { + // vb >= va + if (vc > vb) { + // vc > vb >= va + pivotIndex = b; + pivotValue = vb; + } + else { + // vb >= va, vb >= vc + if (vc < va) { + // vb >= va > vc + pivotIndex = a; + pivotValue = va; + } + else { + // vb >= vc >= va + pivotIndex = c; + pivotValue = vc; + } + } + } + + // move pivot to beginning of array + swap(start, pivotIndex, key, values); + + // we use a three way partition because many duplicate values is an important case + + int low = start + 1; // low points to first value not known to be equal to pivotValue + int high = end; // high points to first value > pivotValue + int i = low; // i scans the array + while (i < high) { + // invariant: values[order[k]] == pivotValue for k in [0..low) + // invariant: values[order[k]] < pivotValue for k in [low..i) + // invariant: values[order[k]] > pivotValue for k in [high..end) + // in-loop: i < high + // in-loop: low < high + // in-loop: i >= low + double vi = key[i]; + if (vi == pivotValue) { + if (low != i) { + swap(low, i, key, values); + } + else { + i++; + } + low++; + } + else if (vi > pivotValue) { + high--; + swap(i, high, key, values); + } + else { + // vi < pivotValue + i++; + } + } + // invariant: values[order[k]] == pivotValue for k in [0..low) + // invariant: values[order[k]] < pivotValue for k in [low..i) + // invariant: values[order[k]] > pivotValue for k in [high..end) + // assert i == high || low == high therefore, we are done with partition + + // at this point, i==high, from [start,low) are == pivot, [low,high) are < and [high,end) are > + // we have to move the values equal to the pivot into the middle. To do this, we swap pivot + // values into the top end of the [low,high) range stopping when we run out of destinations + // or when we run out of values to copy + int from = start; + int to = high - 1; + for (i = 0; from < low && to >= low; i++) { + swap(from++, to--, key, values); + } + if (from == low) { + // ran out of things to copy. This means that the the last destination is the boundary + low = to + 1; + } + else { + // ran out of places to copy to. This means that there are uncopied pivots and the + // boundary is at the beginning of those + low = from; + } + + // now recurse, but arrange it so we handle the longer limit by tail recursion + if (low - start < end - high) { + quickSort(key, values, start, low, limit); + + // this is really a way to do + // quickSort(order, values, high, end, limit); + start = high; + } + else { + quickSort(key, values, high, end, limit); + // this is really a way to do + // quickSort(order, values, start, low, limit); + end = low; + } + } + } + + /** + * Limited range insertion sort. We assume that no element has to move more than limit steps + * because quick sort has done its thing. This version works on parallel arrays of keys and values. + * + * @param key The array of keys + * @param values The values we are sorting + * @param start The starting point of the sort + * @param end The ending point of the sort + * @param limit The largest amount of disorder + */ + @SuppressWarnings("SameParameterValue") + private static void insertionSort(double[] key, double[][] values, int start, int end, int limit) + { + // loop invariant: all values start ... i-1 are ordered + for (int i = start + 1; i < end; i++) { + double v = key[i]; + int m = Math.max(i - limit, start); + for (int j = i; j >= m; j--) { + if (j == m || key[j - 1] <= v) { + if (j < i) { + System.arraycopy(key, j, key, j + 1, i - j); + key[j] = v; + for (double[] value : values) { + double tmp = value[i]; + System.arraycopy(value, j, value, j + 1, i - j); + value[j] = tmp; + } + } + break; + } + } + } + } + + private static void swap(int[] order, int i, int j) + { + int t = order[i]; + order[i] = order[j]; + order[j] = t; + } + + private static void swap(int i, int j, double[] key, double[]...values) + { + double t = key[i]; + key[i] = key[j]; + key[j] = t; + + for (int k = 0; k < values.length; k++) { + t = values[k][i]; + values[k][i] = values[k][j]; + values[k][j] = t; + } + } + + /** + * Check that a partition step was done correctly. For debugging and testing. + * + * @param order The array of indexes representing a permutation of the keys. + * @param values The keys to sort. + * @param pivotValue The value that splits the data + * @param start The beginning of the data of interest. + * @param low Values from start (inclusive) to low (exclusive) are < pivotValue. + * @param high Values from low to high are equal to the pivot. + * @param end Values from high to end are above the pivot. + */ + public static void checkPartition(int[] order, double[] values, double pivotValue, int start, int low, int high, int end) + { + if (order.length != values.length) { + throw new IllegalArgumentException("Arguments must be same size"); + } + + if (!(start >= 0 && low >= start && high >= low && end >= high)) { + throw new IllegalArgumentException(format("Invalid indices %d, %d, %d, %d", start, low, high, end)); + } + + for (int i = 0; i < low; i++) { + double v = values[order[i]]; + if (v >= pivotValue) { + throw new IllegalArgumentException(format("Value greater than pivot at %d", i)); + } + } + + for (int i = low; i < high; i++) { + if (values[order[i]] != pivotValue) { + throw new IllegalArgumentException(format("Non-pivot at %d", i)); + } + } + + for (int i = high; i < end; i++) { + double v = values[order[i]]; + if (v <= pivotValue) { + throw new IllegalArgumentException(format("Value less than pivot at %d", i)); + } + } + } + + /** + * Limited range insertion sort. We assume that no element has to move more than limit steps + * because quick sort has done its thing. + * + * @param order The permutation index + * @param values The values we are sorting + * @param start Where to start the sort + * @param n How many elements to sort + * @param limit The largest amount of disorder + */ + @SuppressWarnings("SameParameterValue") + private static void insertionSort(int[] order, double[] values, int start, int n, int limit) + { + for (int i = start + 1; i < n; i++) { + int t = order[i]; + double v = values[order[i]]; + int m = Math.max(i - limit, start); + for (int j = i; j >= m; j--) { + if (j == 0 || values[order[j - 1]] <= v) { + if (j < i) { + System.arraycopy(order, j, order, j + 1, i - j); + order[j] = t; + } + break; + } + } + } + } + + /** + * Reverses an array in-place. + * + * @param order The array to reverse + */ + public static void reverse(int[] order) + { + reverse(order, 0, order.length); + } + + /** + * Reverses part of an array. See {@link #reverse(int[])} + * + * @param order The array containing the data to reverse. + * @param offset Where to start reversing. + * @param length How many elements to reverse + */ + public static void reverse(int[] order, int offset, int length) + { + for (int i = 0; i < length / 2; i++) { + int t = order[offset + i]; + order[offset + i] = order[offset + length - i - 1]; + order[offset + length - i - 1] = t; + } + } + + /** + * Reverses part of an array. See {@link #reverse(int[])} + * + * @param order The array containing the data to reverse. + * @param offset Where to start reversing. + * @param length How many elements to reverse + */ + @SuppressWarnings("SameParameterValue") + public static void reverse(double[] order, int offset, int length) + { + for (int i = 0; i < length / 2; i++) { + double t = order[offset + i]; + order[offset + i] = order[offset + length - i - 1]; + order[offset + length - i - 1] = t; + } + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/tdigest/docs/t-digest.md b/presto-main/src/main/java/com/facebook/presto/tdigest/docs/t-digest.md new file mode 100644 index 0000000000000..af1a825c76fdf --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/tdigest/docs/t-digest.md @@ -0,0 +1,33 @@ +# T-Digest + +## Background + +The implementation of t-digest in this library is based on [t-digest [Dunning '19]](https://arxiv.org/abs/1902.04023). It has been modified to support Slice serialization. + +## Format + +_Unless otherwise noted, all values are little-endian._ + +### T-Digest Layout + +![T-Digest Layout](tdigest.png) + +The t-digest algorithm uses a set of centroids to store a distribution of values. A centroid is a data structure that has an associated mean and weight. In this format, the centroids that make up the t-digest are serialized by storing the two metrics mentioned. + +* format: byte value which represents the format used for serialization (currently just one format exists, `0`) +* type: byte value which represents the data type of the values in the distribution +* min: represents the minimum value of the distribution represented as a double +* max: represents the maximum value of the distribution represented as a double +* compression: the compression factor used when creating the t-digest +* total weight: the total number of values in the t-digest +* activeCentroids: the number of centroids used by the t-digest +* centroids: the sequence of centroids which represent the t-digest, ordered by ascending mean + +#### Centroid Layout + +![T-Digest Centroid Entry](tdigest_centroids.png) + +The centroids section in the t-digest consists of two arrays of length `activeCentroids`: + +* weight[]: an array with the number of values contained in each centroid +* mean[]: an array with the average value of each centroid \ No newline at end of file diff --git a/presto-main/src/main/java/com/facebook/presto/tdigest/docs/tdigest.graffle b/presto-main/src/main/java/com/facebook/presto/tdigest/docs/tdigest.graffle new file mode 100644 index 0000000000000..7bbf5cd5d95db --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/tdigest/docs/tdigest.graffle @@ -0,0 +1,1662 @@ + + + + + ApplicationVersion + + com.omnigroup.OmniGraffle7 + 195.13.0.328142 + + CreationDate + 2019-06-19 23:03:59 +0000 + Creator + Gaston Arguindegui + GraphDocumentVersion + 16 + GuidesLocked + NO + GuidesVisible + YES + ImageCounter + 1 + LinksVisible + NO + MagnetsVisible + NO + MasterSheets + + MovementHandleVisible + NO + NotesVisible + NO + OriginVisible + NO + PageBreaks + YES + PrintInfo + + NSBottomMargin + + float + 41 + + NSHorizonalPagination + + coded + BAtzdHJlYW10eXBlZIHoA4QBQISEhAhOU051bWJlcgCEhAdOU1ZhbHVlAISECE5TT2JqZWN0AIWEASqEhAFxlwCG + + NSLeftMargin + + float + 18 + + NSPaperSize + + size + {612, 792} + + NSPrintReverseOrientation + + coded + BAtzdHJlYW10eXBlZIHoA4QBQISEhAhOU051bWJlcgCEhAdOU1ZhbHVlAISECE5TT2JqZWN0AIWEASqEhAFxlwCG + + NSRightMargin + + float + 18 + + NSTopMargin + + float + 18 + + + ReadOnly + NO + Sheets + + + ActiveLayerIndex + 0 + AutoAdjust + 6 + AutosizingMargin + 72 + BackgroundGraphic + + Bounds + {{0, 0}, {733, 576}} + Class + GraffleShapes.CanvasBackgroundGraphic + ID + 0 + Style + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + TextAlongPathGlyphAnchor + center + + + BaseZoom + 0 + CanvasDimensionsOrigin + {0, 0} + CanvasOrigin + {0, 0} + CanvasSizingMode + 1 + ColumnAlign + 1 + ColumnSpacing + 36 + DisplayScale + 1 in = 1.00000 in + GraphicsList + + + Class + LineGraphic + ID + 139 + LogicalPath + + elements + + + element + MOVETO + point + {184.84121889575664, 180.66666666666666} + + + element + LINETO + point + {185.93107292003268, 110} + + + + Points + + {184.84121889575664, 180.66666666666666} + {185.93107292003268, 110} + + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + HeadArrow + FilledArrow + Legacy + + LineType + 1 + + + + + Bounds + {{159.86669081575391, 181.04011151904371}, {47, 27.999988876101849}} + Class + ShapedGraphic + ID + 138 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 total weight \ +(1 double)} + TextAlongPathGlyphAnchor + center + VerticalPad + 0.0 + + Wrap + NO + + + Bounds + {{168.70002414908734, 94.986664558450457}, {36, 13.999994438050924}} + Class + ShapedGraphic + ID + 137 + Text + + TextAlongPathGlyphAnchor + center + + + + Bounds + {{48.601733947793605, 169.16709747692138}, {37.29150390625, 26}} + Class + ShapedGraphic + FitText + YES + Flow + Resize + FontInfo + + Color + + space + gg22 + w + 0 + + Size + 11 + + ID + 136 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs22 \cf0 type\ +(1 byte)} + TextAlongPathGlyphAnchor + center + VerticalPad + 0.0 + + Wrap + NO + + + Class + LineGraphic + ID + 135 + LogicalPath + + elements + + + element + MOVETO + point + {56.666645986841068, 167.52621614696335} + + + element + LINETO + point + {56.533306241398691, 110} + + + + Points + + {56.666645986841068, 167.52621614696335} + {56.533306241398691, 110} + + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + HeadArrow + FilledArrow + Legacy + + LineType + 1 + + + + + Bounds + {{52.533306241398691, 94.986720085144043}, {9, 14}} + Class + ShapedGraphic + ID + 134 + Text + + TextAlongPathGlyphAnchor + center + + + + Bounds + {{127.20002414908731, 218.01295966264541}, {47, 27.999988876101849}} + Class + ShapedGraphic + ID + 131 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 compression \ +(1 double)} + TextAlongPathGlyphAnchor + center + VerticalPad + 0.0 + + Wrap + NO + + + Class + LineGraphic + Head + + ID + 133 + + ID + 132 + LogicalPath + + elements + + + element + MOVETO + point + {150.70002414908734, 218.01295966264541} + + + element + LINETO + point + {150.70002414908734, 109.48671452319495} + + + + Points + + {150.70002414908734, 218.01295966264541} + {150.70002414908734, 109.48671452319495} + + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + HeadArrow + FilledArrow + Legacy + + LineType + 1 + + + Tail + + ID + 131 + + + + Bounds + {{132.70002414908734, 94.986720085144043}, {36, 13.999994438050924}} + Class + ShapedGraphic + ID + 133 + Text + + TextAlongPathGlyphAnchor + center + + + + Bounds + {{89.682446828750074, 181.04011151904371}, {54.03515625, 28}} + Class + ShapedGraphic + FitText + YES + Flow + Resize + ID + 128 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 max\ +(1 double)} + TextAlongPathGlyphAnchor + center + VerticalPad + 0.0 + + Wrap + NO + + + Class + LineGraphic + Head + + ID + 130 + + ID + 129 + LogicalPath + + elements + + + element + MOVETO + point + {116.399122613057, 181.04011151904371} + + + element + LINETO + point + {114.86122015494765, 109.48654911092339} + + + + Points + + {116.399122613057, 181.04011151904371} + {114.86122015494765, 109.48654911092339} + + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + HeadArrow + FilledArrow + Legacy + + LineType + 1 + + + Tail + + ID + 128 + + + + Bounds + {{96.700024953750017, 94.986664558450457}, {36, 14}} + Class + ShapedGraphic + ID + 130 + Text + + TextAlongPathGlyphAnchor + center + + + + Bounds + {{28.533306241398805, 46.066613891428744}, {37, 14}} + Class + ShapedGraphic + FitText + YES + Flow + Resize + ID + 127 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 Layout} + TextAlongPathGlyphAnchor + left + VerticalPad + 0.0 + + Wrap + NO + + + Bounds + {{185.0399958640337, 259.19999420642853}, {82.705078125, 28}} + Class + ShapedGraphic + FitText + YES + Flow + Resize + ID + 91 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 activeCentroids\ +(1 int)} + TextAlongPathGlyphAnchor + center + VerticalPad + 0.0 + + Wrap + NO + + + Class + LineGraphic + ID + 93 + LogicalPath + + elements + + + element + MOVETO + point + {222.38426813609101, 258} + + + element + LINETO + point + {219.71760146942427, 108} + + + + Points + + {222.38426813609101, 258} + {219.71760146942427, 108} + + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + HeadArrow + FilledArrow + Legacy + + LineType + 1 + + + + + Bounds + {{204.38426813609101, 94.986844558450699}, {18, 14}} + Class + ShapedGraphic + ID + 96 + Text + + TextAlongPathGlyphAnchor + center + + + + Bounds + {{332.6883282804597, 217.76023069828778}, {136.734375, 28}} + Class + ShapedGraphic + FitText + YES + Flow + Resize + ID + 84 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 centroids\ +(16 bytes * # of centroids)} + TextAlongPathGlyphAnchor + center + VerticalPad + 0.0 + + Wrap + NO + + + Bounds + {{61.54908701280732, 218.01291853874716}, {54.03515625, 28}} + Class + ShapedGraphic + FitText + YES + Flow + Resize + ID + 83 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 min \ +(1 double)} + TextAlongPathGlyphAnchor + center + VerticalPad + 0.0 + + Wrap + NO + + + Bounds + {{26.639999404549599, 268.97396151970338}, {55, 28}} + Class + ShapedGraphic + FitText + YES + Flow + Resize + ID + 81 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 format = 0\ +(1 byte)} + TextAlongPathGlyphAnchor + left + VerticalPad + 0.0 + + Wrap + NO + + + Class + LineGraphic + Head + + ID + 69 + + ID + 80 + LogicalPath + + elements + + + element + MOVETO + point + {48, 266.33333333333331} + + + element + LINETO + point + {48.000020677902718, 109.48672008514401} + + + + Points + + {48, 266.33333333333331} + {48.000020677902718, 109.48672008514401} + + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + HeadArrow + FilledArrow + Legacy + + LineType + 1 + + + + + Class + LineGraphic + ID + 79 + LogicalPath + + elements + + + element + MOVETO + point + {86.592592804473995, 219.96077055845052} + + + element + LINETO + point + {87.682446828750017, 110} + + + + Points + + {86.592592804473995, 219.96077055845052} + {87.682446828750017, 110} + + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + HeadArrow + FilledArrow + Legacy + + LineType + 1 + + + + + Class + LineGraphic + Head + + ID + 71 + + ID + 77 + LogicalPath + + elements + + + element + MOVETO + point + {400.9835956074657, 217.76023069828778} + + + element + LINETO + point + {400.42737774400473, 109.48671348768369} + + + + Points + + {400.9835956074657, 217.76023069828778} + {400.42737774400473, 109.48671348768369} + + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + HeadArrow + FilledArrow + Legacy + + LineType + 1 + + + Tail + + ID + 84 + + + + Bounds + {{222.27702935417665, 94.986720085144043}, {356.22363951923273, 14}} + Class + ShapedGraphic + ID + 71 + Text + + TextAlongPathGlyphAnchor + center + + + + Bounds + {{61.06666513780732, 94.986754558450528}, {36, 14}} + Class + ShapedGraphic + ID + 70 + Text + + TextAlongPathGlyphAnchor + center + + + + Bounds + {{43.500021666666626, 94.986720085144043}, {9, 14}} + Class + ShapedGraphic + ID + 69 + Text + + TextAlongPathGlyphAnchor + center + + + + GridInfo + + HPages + 1 + KeepToScale + + Layers + + + Artboards + + Lock + + Name + Layer 1 + Print + + View + + + + LayoutInfo + + Animate + NO + circoMinDist + 18 + circoSeparation + 0.0 + layoutEngine + dot + neatoLineLength + 0.20000000298023224 + neatoSeparation + 0.0 + twopiSeparation + 0.0 + + Orientation + 1 + PrintOnePage + + RowAlign + 1 + RowSpacing + 36 + SheetTitle + Layout + UniqueID + 3 + VPages + 1 + VisibleVoidKey + 1 + + + ActiveLayerIndex + 0 + AutoAdjust + 6 + AutosizingMargin + 72 + BackgroundGraphic + + Bounds + {{0, 0}, {733, 576}} + Class + GraffleShapes.CanvasBackgroundGraphic + ID + 0 + Style + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + TextAlongPathGlyphAnchor + center + + + BaseZoom + 0 + CanvasDimensionsOrigin + {0, 0} + CanvasOrigin + {0, 0} + CanvasSizingMode + 1 + ColumnAlign + 1 + ColumnSpacing + 36 + DisplayScale + 1 in = 1.00000 in + GraphicsList + + + Bounds + {{257.71184377446417, 188.67875910191651}, {94.04296875, 28}} + Class + ShapedGraphic + FitText + YES + Flow + Resize + ID + 118 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 mean []\ +(array of doubles)} + TextAlongPathGlyphAnchor + center + VerticalPad + 0.0 + + Wrap + NO + + + Class + LineGraphic + ID + 119 + LogicalPath + + elements + + + element + MOVETO + point + {305.36292054282848, 188.34542576858314} + + + element + LINETO + point + {304.36292054282848, 91.01209243524977} + + + + Points + + {305.36292054282848, 188.34542576858314} + {304.36292054282848, 91.01209243524977} + + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + HeadArrow + FilledArrow + Legacy + + LineType + 1 + + + + + Bounds + {{232.73332814946434, 73.319362788286639}, {144, 14}} + Class + ShapedGraphic + ID + 120 + Text + + TextAlongPathGlyphAnchor + center + + + + Bounds + {{113.71184377446423, 190.70482208221301}, {94.04296875, 28}} + Class + ShapedGraphic + FitText + YES + Flow + Resize + ID + 124 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 weight []\ +(array of doubles)} + TextAlongPathGlyphAnchor + center + VerticalPad + 0.0 + + Wrap + NO + + + Class + LineGraphic + ID + 125 + LogicalPath + + elements + + + element + MOVETO + point + {160.23332814946434, 187.01209243524988} + + + element + LINETO + point + {160.23332814946434, 91.01209243524977} + + + + Points + + {160.23332814946434, 187.01209243524988} + {160.23332814946434, 91.01209243524977} + + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + HeadArrow + FilledArrow + Legacy + + LineType + 1 + + + + + Bounds + {{88.733328149464228, 73.319362788286639}, {144, 14}} + Class + ShapedGraphic + ID + 126 + Text + + TextAlongPathGlyphAnchor + center + + + + Bounds + {{14.577296911059193, 30.959999307990074}, {182.42578125, 14}} + Class + ShapedGraphic + FitText + YES + Flow + Resize + ID + 68 + Style + + fill + + Draws + NO + + shadow + + Draws + NO + + stroke + + Draws + NO + + + Text + + Pad + 0.0 + Text + {\rtf1\ansi\ansicpg1252\cocoartf1671\cocoasubrtf400 +{\fonttbl\f0\fswiss\fcharset0 Helvetica;} +{\colortbl;\red255\green255\blue255;} +{\*\expandedcolortbl;;} +\pard\tx560\tx1120\tx1680\tx2240\tx2800\tx3360\tx3920\tx4480\tx5040\tx5600\tx6160\tx6720\pardirnatural\qc\partightenfactor0 + +\f0\fs24 \cf0 Centroids section layout (type = 0)} + TextAlongPathGlyphAnchor + center + VerticalPad + 0.0 + + Wrap + NO + + + GridInfo + + HPages + 1 + KeepToScale + + Layers + + + Artboards + + Lock + + Name + Layer 1 + Print + + View + + + + LayoutInfo + + Animate + NO + circoMinDist + 18 + circoSeparation + 0.0 + layoutEngine + dot + neatoLineLength + 0.20000000298023224 + neatoSeparation + 0.0 + twopiSeparation + 0.0 + + Orientation + 1 + PrintOnePage + + RowAlign + 1 + RowSpacing + 36 + SheetTitle + Centroid Layout + UniqueID + 7 + VPages + 1 + VisibleVoidKey + 1 + + + SmartAlignmentGuidesActive + YES + SmartDistanceGuidesActive + YES + UseEntirePage + + WindowInfo + + CurrentSheet + 1 + Frame + {{2065, 871}, {1680, 878}} + ShowInfo + + ShowRuler + + Sidebar + + SidebarWidth + 244 + Sidebar_Tab + 0 + VisibleRegion + {{2.7027027027027359, -2.0270270270269481}, {766.21621621621625, 518.91891891891896}} + WindowInfo_InspectorTab + + com.omnigroup.OmniGraffle.inspectorGroup.object + + ZoomValues + + + Layout + 1.5 + 1 + + + Centroid Layout + 1.48 + 1 + + + + compressOnDiskKey + + copyLinkedImagesKey + + createSinglePDFKey + 0 + exportAreaKey + 1 + exportQualityKey + 100 + exportSizesKey + + 1 + + + fileFormatKey + 0 + htmlImageTypeKey + 0 + includeBackgroundGraphicKey + + includeNonPrintingLayersKey + + lastExportTypeKey + 0 + marginWidthKey + 0.0 + readOnlyKey + + resolutionForBMPKey + 1 + resolutionForGIFKey + 1 + resolutionForHTMLKey + 1 + resolutionForJPGKey + 1 + resolutionForPNGKey + 1 + resolutionForTIFFKey + 1 + resolutionUnitsKey + 0 + saveAsFlatFileOptionKey + 1 + useArtboardsKey + + useMarginKey + + useNotesKey + + + diff --git a/presto-main/src/main/java/com/facebook/presto/tdigest/docs/tdigest.png b/presto-main/src/main/java/com/facebook/presto/tdigest/docs/tdigest.png new file mode 100644 index 0000000000000..3778cdbbe1727 Binary files /dev/null and b/presto-main/src/main/java/com/facebook/presto/tdigest/docs/tdigest.png differ diff --git a/presto-main/src/main/java/com/facebook/presto/tdigest/docs/tdigest_centroids.png b/presto-main/src/main/java/com/facebook/presto/tdigest/docs/tdigest_centroids.png new file mode 100644 index 0000000000000..7a9877db1e0ba Binary files /dev/null and b/presto-main/src/main/java/com/facebook/presto/tdigest/docs/tdigest_centroids.png differ diff --git a/presto-main/src/test/java/com/facebook/presto/tdigest/BenchmarkTDigest.java b/presto-main/src/test/java/com/facebook/presto/tdigest/BenchmarkTDigest.java new file mode 100644 index 0000000000000..8315240a90c29 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/tdigest/BenchmarkTDigest.java @@ -0,0 +1,203 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.tdigest; + +import io.airlift.slice.Slice; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import static com.facebook.presto.tdigest.TDigest.createTDigest; + +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(3) +@Warmup(iterations = 10) +@Measurement(iterations = 10) +public class BenchmarkTDigest +{ + private static final int NUMBER_OF_ENTRIES = 1_000_000; + private static final int STANDARD_COMPRESSION_FACTOR = 100; + + @State(Scope.Thread) + public static class Data + { + private long[] normalDistribution1; + private long[] normalDistribution2; + + @Setup + public void setup() + { + normalDistribution1 = makeNormalValues(NUMBER_OF_ENTRIES); + normalDistribution2 = makeNormalValues(NUMBER_OF_ENTRIES); + } + + private long[] makeNormalValues(int size) + { + long[] values = new long[size]; + for (int i = 0; i < size; i++) { + long value = Math.abs((long) (ThreadLocalRandom.current().nextGaussian() * 1_000_000_000)); + values[i] = value; + } + + return values; + } + + private long[] makeRandomValues(int size) + { + long[] values = new long[size]; + for (int i = 0; i < size; i++) { + values[i] = (long) (Math.random() * 1_000_000_000); + } + + return values; + } + } + + @State(Scope.Thread) + public static class Digest + { + protected TDigest digest1; + protected TDigest digest2; + protected Slice serializedDigest1; + + @Setup + public void setup(Data data) + { + digest1 = makeTDigest(data.normalDistribution1); + digest2 = makeTDigest(data.normalDistribution2); + serializedDigest1 = digest1.serialize(); + } + + private TDigest makeTDigest(long[] values) + { + TDigest result = createTDigest(STANDARD_COMPRESSION_FACTOR); + int k = 1_000_000 / NUMBER_OF_ENTRIES; + for (int i = 0; i < k; i++) { + for (long value : values) { + result.add(value); + } + } + return result; + } + } + + @State(Scope.Thread) + public static class DigestWithQuantile + extends Digest + { + // allows testing how fast different quantiles can be computed + @Param({"0.0001", "0.01", "0.2500", "0.5000", "0.7500", "0.9999"}) + float quantile; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OperationsPerInvocation(NUMBER_OF_ENTRIES) + public TDigest benchmarkInsertsT(Data data) + { + TDigest digest = createTDigest(STANDARD_COMPRESSION_FACTOR); + int k = 1_000_000 / NUMBER_OF_ENTRIES; + for (int i = 0; i < k; i++) { + for (long value : data.normalDistribution1) { + digest.add(value); + } + } + + return digest; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public double benchmarkQuantilesT(DigestWithQuantile data) + { + return data.digest1.getQuantile(data.quantile); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public TDigest benchmarkCopyT(Digest data) + { + TDigest copy = createTDigest(data.digest1.getCompressionFactor()); + copy.merge(data.digest1); + return copy; + } + + @Benchmark @BenchmarkMode(Mode.AverageTime) + public TDigest benchmarkMergeT(Digest data) + { + TDigest merged = createTDigest(data.digest1.serialize()); + merged.merge(data.digest2); + return merged; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public TDigest benchmarkDeserializeT(Digest data) + { + return createTDigest(data.serializedDigest1); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public Slice benchmarkSerializeT(Digest data) + { + return data.digest1.serialize(); + } + + public static void main(String[] args) + throws RunnerException + { + Options options = new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + BenchmarkTDigest.class.getSimpleName() + ".*") + .build(); + + new Runner(options).run(); + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/tdigest/TestTDigest.java b/presto-main/src/test/java/com/facebook/presto/tdigest/TestTDigest.java new file mode 100644 index 0000000000000..c72cb9579a5a5 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/tdigest/TestTDigest.java @@ -0,0 +1,369 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.tdigest; + +import org.apache.commons.math3.distribution.BinomialDistribution; +import org.apache.commons.math3.distribution.GeometricDistribution; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.distribution.PoissonDistribution; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static com.facebook.presto.tdigest.TDigest.createTDigest; +import static java.lang.String.format; +import static java.util.Collections.sort; +import static org.testng.Assert.assertTrue; + +public class TestTDigest +{ + private static final int NUMBER_OF_ENTRIES = 1_000_000; + private static final int STANDARD_COMPRESSION_FACTOR = 100; + private static final double STANDARD_ERROR = 0.01; + private static final double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + @Test + public void testAddElementsInOrder() + { + TDigest tDigest = createTDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList<>(); + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + tDigest.add(i); + list.add(i); + } + + for (int i = 0; i < quantile.length; i++) { + assertDiscreteWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + + @Test + public void testMergeTwoDistributionsWithoutOverlap() + { + TDigest tDigest1 = createTDigest(STANDARD_COMPRESSION_FACTOR); + TDigest tDigest2 = createTDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList(); + + for (int i = 0; i < NUMBER_OF_ENTRIES / 2; i++) { + tDigest1.add(i); + tDigest2.add(i + NUMBER_OF_ENTRIES / 2); + list.add(i); + list.add(i + NUMBER_OF_ENTRIES / 2); + } + + tDigest1.merge(tDigest2); + sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertDiscreteWithinBound(quantile[i], STANDARD_ERROR, list, tDigest1); + } + } + + @Test + public void testMergeTwoDistributionsWithOverlap() + { + TDigest tDigest1 = createTDigest(STANDARD_COMPRESSION_FACTOR); + TDigest tDigest2 = createTDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList(); + + for (int i = 0; i < NUMBER_OF_ENTRIES / 2; i++) { + tDigest1.add(i); + tDigest2.add(i); + list.add(i); + list.add(i); + } + + tDigest2.merge(tDigest1); + sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertDiscreteWithinBound(quantile[i], STANDARD_ERROR, list, tDigest2); + } + } + + @Test + public void testAddElementsRandomized() + { + TDigest tDigest = createTDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList(); + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + double value = Math.random() * NUMBER_OF_ENTRIES; + tDigest.add(value); + list.add(value); + } + + sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertContinuousWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + + @Test + public void testNormalDistributionLowVariance() + { + TDigest tDigest = createTDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList(); + NormalDistribution normal = new NormalDistribution(1000, 1); + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + double value = normal.sample(); + tDigest.add(value); + list.add(value); + } + + sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertContinuousWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + + @Test + public void testNormalDistributionHighVariance() + { + TDigest tDigest = createTDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList(); + NormalDistribution normal = new NormalDistribution(0, 1); + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + double value = normal.sample(); + tDigest.add(value); + list.add(value); + } + + sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertContinuousWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + + @Test + public void testMergeTwoNormalDistributions() + { + TDigest tDigest1 = createTDigest(STANDARD_COMPRESSION_FACTOR); + TDigest tDigest2 = createTDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList<>(); + NormalDistribution normal = new NormalDistribution(0, 50); + + for (int i = 0; i < NUMBER_OF_ENTRIES / 2; i++) { + double value1 = normal.sample(); + double value2 = normal.sample(); + tDigest1.add(value1); + tDigest2.add(value2); + list.add(value1); + list.add(value2); + } + + tDigest1.merge(tDigest2); + sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertContinuousWithinBound(quantile[i], STANDARD_ERROR, list, tDigest1); + } + } + + @Test + public void testMergeManySmallNormalDistributions() + { + TDigest tDigest = createTDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList<>(); + NormalDistribution normal = new NormalDistribution(500, 20); + int digests = 100_000; + + for (int k = 0; k < digests; k++) { + TDigest current = createTDigest(STANDARD_COMPRESSION_FACTOR); + for (int i = 0; i < 10; i++) { + double value = normal.sample(); + current.add(value); + list.add(value); + } + tDigest.merge(current); + } + + sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertContinuousWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + + @Test + public void testMergeManyLargeNormalDistributions() + { + TDigest tDigest = createTDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList<>(); + NormalDistribution normal = new NormalDistribution(500, 20); + int digests = 1000; + + for (int k = 0; k < digests; k++) { + TDigest current = createTDigest(STANDARD_COMPRESSION_FACTOR); + for (int i = 0; i < NUMBER_OF_ENTRIES / digests; i++) { + double value = normal.sample(); + current.add(value); + list.add(value); + } + tDigest.merge(current); + } + + sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertContinuousWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + + // disabled because test takes almost 10s + @Test(enabled = false) + public void testBinomialDistribution() + { + int trials = 10; + for (int k = 1; k < trials; k++) { + TDigest tDigest = createTDigest(STANDARD_COMPRESSION_FACTOR); + BinomialDistribution binomial = new BinomialDistribution(trials, k * 0.1); + List list = new ArrayList<>(); + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + int sample = binomial.sample(); + tDigest.add(sample); + list.add(sample); + } + + Collections.sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertDiscreteWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + } + + @Test(enabled = false) + public void testGeometricDistribution() + { + int trials = 10; + for (int k = 1; k < trials; k++) { + TDigest tDigest = createTDigest(STANDARD_COMPRESSION_FACTOR); + GeometricDistribution geometric = new GeometricDistribution(k * 0.1); + List list = new ArrayList(); + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + int sample = geometric.sample(); + tDigest.add(sample); + list.add(sample); + } + + Collections.sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertDiscreteWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + } + + @Test(enabled = false) + public void testPoissonDistribution() + { + int trials = 10; + for (int k = 1; k < trials; k++) { + TDigest tDigest = createTDigest(STANDARD_COMPRESSION_FACTOR); + PoissonDistribution poisson = new PoissonDistribution(k * 0.1); + List list = new ArrayList(); + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + int sample = poisson.sample(); + tDigest.add(sample); + list.add(sample); + } + + Collections.sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertDiscreteWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + } + + private void assertContinuousWithinBound(double quantile, double bound, List values, TDigest tDigest) + { + double lowerBound = quantile - bound; + double upperBound = quantile + bound; + + if (lowerBound < 0) { + lowerBound = tDigest.getMin(); + } + else { + lowerBound = values.get((int) (NUMBER_OF_ENTRIES * lowerBound)); + } + + if (upperBound >= 1) { + upperBound = tDigest.getMax(); + } + else { + upperBound = values.get((int) (NUMBER_OF_ENTRIES * upperBound)); + } + + assertTrue(tDigest.getQuantile(quantile) >= lowerBound && tDigest.getQuantile(quantile) <= upperBound, + format("Value %s is outside bound [%s, %s] for quantile %s", + tDigest.getQuantile(quantile), lowerBound, upperBound, quantile)); + } + + private void assertDiscreteWithinBound(double quantile, double bound, List values, TDigest tDigest) + { + double lowerBound = quantile - bound; + double upperBound = quantile + bound; + + if (lowerBound < 0) { + lowerBound = tDigest.getMin(); + } + else { + lowerBound = values.get((int) (NUMBER_OF_ENTRIES * lowerBound)); + } + + if (upperBound >= 1) { + upperBound = tDigest.getMax(); + } + else { + upperBound = values.get((int) (NUMBER_OF_ENTRIES * upperBound)); + } + // for discrete distributions, t-digest usually gives back a double that is between 2 integers (2.88, 1.16, etc) + // however, a discrete distribution should return an integer, not something in between + // we use Math.rint to round to the nearest integer, since casting as (int) always rounds down and no casting results in error > 1% + assertTrue(Math.rint(tDigest.getQuantile(quantile)) >= lowerBound && Math.rint(tDigest.getQuantile(quantile)) <= upperBound, + format("Value %s is outside bound [%s, %s] for quantile %s", tDigest.getQuantile(quantile), lowerBound, upperBound, quantile)); + } +}