diff --git a/presto-main/src/main/java/com/facebook/presto/operator/scalar/AbstractTDigest.java b/presto-main/src/main/java/com/facebook/presto/operator/scalar/AbstractTDigest.java new file mode 100644 index 0000000000000..af13f1c0e79c9 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/scalar/AbstractTDigest.java @@ -0,0 +1,150 @@ +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.operator.scalar; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +public abstract class AbstractTDigest extends TDigest { + final Random gen = new Random(); + boolean recordAllData = false; + + /** + * Same as {@link #weightedAverageSorted(double, double, double, double)} but flips + * the order of the variables if x2 is greater than + * x1. + */ + static double weightedAverage(double x1, double w1, double x2, double w2) { + if (x1 <= x2) { + return weightedAverageSorted(x1, w1, x2, w2); + } else { + return weightedAverageSorted(x2, w2, x1, w1); + } + } + + /** + * Compute the weighted average between x1 with a weight of + * w1 and x2 with a weight of w2. + * This expects x1 to be less than or equal to x2 + * and is guaranteed to return a number between x1 and + * x2. + */ + private static double weightedAverageSorted(double x1, double w1, double x2, double w2) { + assert x1 <= x2; + final double x = (x1 * w1 + x2 * w2) / (w1 + w2); + return Math.max(x1, Math.min(x, x2)); + } + + static double interpolate(double x, double x0, double x1) { + return (x - x0) / (x1 - x0); + } + + static void encode(ByteBuffer buf, int n) { + int k = 0; + while (n < 0 || n > 0x7f) { + byte b = (byte) (0x80 | (0x7f & n)); + buf.put(b); + n = n >>> 7; + k++; + if (k >= 6) { + throw new IllegalStateException("Size is implausibly large"); + } + } + buf.put((byte) n); + } + + static int decode(ByteBuffer buf) { + int v = buf.get(); + int z = 0x7f & v; + int shift = 7; + while ((v & 0x80) != 0) { + if (shift > 28) { + throw new IllegalStateException("Shift too large in decode"); + } + v = buf.get(); + z += (v & 0x7f) << shift; + shift += 7; + } + return z; + } + + abstract void add(double x, int w, Centroid base); + + /** + * Computes an interpolated value of a quantile that is between two centroids. + * + * Index is the quantile desired multiplied by the total number of samples - 1. + * + * @param index Denormalized quantile desired + * @param previousIndex The denormalized quantile corresponding to the center of the previous centroid. + * @param nextIndex The denormalized quantile corresponding to the center of the following centroid. + * @param previousMean The mean of the previous centroid. + * @param nextMean The mean of the following centroid. + * @return The interpolated mean. + */ + static double quantile(double index, double previousIndex, double nextIndex, double previousMean, double nextMean) { + final double delta = nextIndex - previousIndex; + final double previousWeight = (nextIndex - index) / delta; + final double nextWeight = (index - previousIndex) / delta; + return previousMean * previousWeight + nextMean * nextWeight; + } + + /** + * Sets up so that all centroids will record all data assigned to them. For testing only, really. + */ + @Override + public TDigest recordAllData() { + recordAllData = true; + return this; + } + + @Override + public boolean isRecording() { + return recordAllData; + } + + /** + * Adds a sample to a histogram. + * + * @param x The value to add. + */ + @Override + public void add(double x) { + add(x, 1); + } + + @Override + public void add(TDigest other) { + List tmp = new ArrayList<>(); + for (Centroid centroid : other.centroids()) { + tmp.add(centroid); + } + + Collections.shuffle(tmp, gen); + for (Centroid centroid : tmp) { + add(centroid.mean(), centroid.count(), centroid); + } + } + + protected Centroid createCentroid(double mean, int id) { + return new Centroid(mean, id, recordAllData); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/operator/scalar/Centroid.java b/presto-main/src/main/java/com/facebook/presto/operator/scalar/Centroid.java new file mode 100644 index 0000000000000..28afa394e4ff5 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/scalar/Centroid.java @@ -0,0 +1,192 @@ +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.operator.scalar; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A single centroid which represents a number of data points. + */ +public class Centroid + implements Comparable, Serializable +{ + private static final AtomicInteger uniqueCount = new AtomicInteger(1); + + private double centroid; + private int count; + + // The ID is transient because it must be unique within a given JVM. A new + // ID should be generated from uniqueCount when a Centroid is deserialized. + private transient int id; + + private List actualData; + + private Centroid(boolean record) + { + id = uniqueCount.getAndIncrement(); + if (record) { + actualData = new ArrayList<>(); + } + } + + public Centroid(double x) + { + this(false); + start(x, 1, uniqueCount.getAndIncrement()); + } + + public Centroid(double x, int w) + { + this(false); + start(x, w, uniqueCount.getAndIncrement()); + } + + public Centroid(double x, int w, int id) + { + this(false); + start(x, w, id); + } + + public Centroid(double x, int id, boolean record) + { + this(record); + start(x, 1, id); + } + + Centroid(double x, int w, List data) + { + this(x, w); + actualData = data; + } + + private void start(double x, int w, int id) + { + this.id = id; + add(x, w); + } + + public void add(double x, int w) + { + if (actualData != null) { + actualData.add(x); + } + count += w; + centroid += w * (x - centroid) / count; + } + + public double mean() + { + return centroid; + } + + public int count() + { + return count; + } + + public int id() + { + return id; + } + + @Override + public String toString() + { + return "Centroid{" + + "centroid=" + centroid + + ", count=" + count + + '}'; + } + + @Override + public int hashCode() + { + return id; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Centroid centroid = (Centroid) o; + return this.centroid == centroid.mean() && this.count == centroid.count(); + } + + @Override + public int compareTo(@SuppressWarnings("NullableProblems") Centroid o) + { + int r = Double.compare(centroid, o.centroid); + if (r == 0) { + r = id - o.id; + } + return r; + } + + public List data() + { + return actualData; + } + + @SuppressWarnings("WeakerAccess") + public void insertData(double x) + { + if (actualData == null) { + actualData = new ArrayList<>(); + } + actualData.add(x); + } + + public static Centroid createWeighted(double x, int w, Iterable data) + { + Centroid r = new Centroid(data != null); + r.add(x, w, data); + return r; + } + + public void add(double x, int w, Iterable data) + { + if (actualData != null) { + if (data != null) { + for (Double old : data) { + actualData.add(old); + } + } + else { + actualData.add(x); + } + } + centroid = TDigest.weightedAverage(centroid, count, x, w); + count += w; + } + + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException + { + in.defaultReadObject(); + id = uniqueCount.getAndIncrement(); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/operator/scalar/MergingDigest.java b/presto-main/src/main/java/com/facebook/presto/operator/scalar/MergingDigest.java new file mode 100644 index 0000000000000..9f2fe2c2ba70d --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/scalar/MergingDigest.java @@ -0,0 +1,925 @@ +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.operator.scalar; + +import java.nio.ByteBuffer; +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** + * Maintains a t-digest by collecting new points in a buffer that is then sorted occasionally and merged + * into a sorted array that contains previously computed centroids. + *

+ * This can be very fast because the cost of sorting and merging is amortized over several insertion. If + * we keep N centroids total and have the input array is k long, then the amortized cost is something like + *

+ * N/k + log k + *

+ * These costs even out when N/k = log k. Balancing costs is often a good place to start in optimizing an + * algorithm. For different values of compression factor, the following table shows estimated asymptotic + * values of N and suggested values of k: + * + * + * + * + * + * + * + * + * + * + *
CompressionNk
507825
10015742
20031473
Sizing considerations for t-digest
+ *

+ * The virtues of this kind of t-digest implementation include: + *

    + *
  • No allocation is required after initialization
  • + *
  • The data structure automatically compresses existing centroids when possible
  • + *
  • No Java object overhead is incurred for centroids since data is kept in primitive arrays
  • + *
+ *

+ * The current implementation takes the liberty of using ping-pong buffers for implementing the merge resulting + * in a substantial memory penalty, but the complexity of an in place merge was not considered as worthwhile + * since even with the overhead, the memory cost is less than 40 bytes per centroid which is much less than half + * what the AVLTreeDigest uses and no dynamic allocation is required at all. + */ +public class MergingDigest extends AbstractTDigest { + private int mergeCount = 0; + + private final double publicCompression; + private final double compression; + + // points to the first unused centroid + private int lastUsedCell; + + // sum_i weight[i] See also unmergedWeight + private double totalWeight = 0; + + // number of points that have been added to each merged centroid + private final double[] weight; + // mean of points added to each merged centroid + private final double[] mean; + + // history of all data added to centroids (for testing purposes) + private List> data = null; + + // sum_i tempWeight[i] + private double unmergedWeight = 0; + + // this is the index of the next temporary centroid + // this is a more Java-like convention than lastUsedCell uses + private int tempUsed = 0; + private final double[] tempWeight; + private final double[] tempMean; + private List> tempData = null; + + + // array used for sorting the temp centroids. This is a field + // to avoid allocations during operation + private final int[] order; + + // if true, alternate upward and downward merge passes + public boolean useAlternatingSort = true; + // if true, use higher working value of compression during construction, then reduce on presentation + public boolean useTwoLevelCompression = true; + + // this forces centroid merging based on size limit rather than + // based on accumulated k-index. This can be much faster since we + // scale functions are more expensive than the corresponding + // weight limits. + public static boolean useWeightLimit = true; + + /** + * Allocates a buffer merging t-digest. This is the normally used constructor that + * allocates default sized internal arrays. Other versions are available, but should + * only be used for special cases. + * + * @param compression The compression factor + */ + @SuppressWarnings("WeakerAccess") + public MergingDigest(double compression) { + this(compression, -1); + } + + /** + * If you know the size of the temporary buffer for incoming points, you can use this entry point. + * + * @param compression Compression factor for t-digest. Same as 1/\delta in the paper. + * @param bufferSize How many samples to retain before merging. + */ + @SuppressWarnings("WeakerAccess") + public MergingDigest(double compression, int bufferSize) { + // we can guarantee that we only need ceiling(compression). + this(compression, bufferSize, -1); + } + + /** + * Fully specified constructor. Normally only used for deserializing a buffer t-digest. + * + * @param compression Compression factor + * @param bufferSize Number of temporary centroids + * @param size Size of main buffer + */ + @SuppressWarnings("WeakerAccess") + public MergingDigest(double compression, int bufferSize, int size) { + // ensure compression >= 10 + // default size = 2 * ceil(compression) + // default bufferSize = 5 * size + // scale = max(2, bufferSize / size - 1) + // compression, publicCompression = sqrt(scale-1)*compression, compression + // ensure size > 2 * compression + weightLimitFudge + // ensure bufferSize > 2*size + + // force reasonable value. Anything less than 10 doesn't make much sense because + // too few centroids are retained + if (compression < 10) { + compression = 10; + } + + // the weight limit is too conservative about sizes and can require a bit of extra room + double sizeFudge = 0; + if (useWeightLimit) { + sizeFudge = 10; + if (compression < 30) sizeFudge += 20; + } + + // default size + size = (int) Math.max(2 * compression + sizeFudge, size); + + // default buffer + if (bufferSize == -1) { + // TODO update with current numbers + // having a big buffer is good for speed + // experiments show bufferSize = 1 gives half the performance of bufferSize=10 + // bufferSize = 2 gives 40% worse performance than 10 + // but bufferSize = 5 only costs about 5-10% + // + // compression factor time(us) + // 50 1 0.275799 + // 50 2 0.151368 + // 50 5 0.108856 + // 50 10 0.102530 + // 100 1 0.215121 + // 100 2 0.142743 + // 100 5 0.112278 + // 100 10 0.107753 + // 200 1 0.210972 + // 200 2 0.148613 + // 200 5 0.118220 + // 200 10 0.112970 + // 500 1 0.219469 + // 500 2 0.158364 + // 500 5 0.127552 + // 500 10 0.121505 + bufferSize = 5 * size; + } + + // ensure enough space in buffer + if (bufferSize <= 2 * size) { + bufferSize = 2 * size; + } + + // scale is the ratio of extra buffer to the final size + // we have to account for the fact that we copy all live centroids into the incoming space + double scale = Math.max(1, bufferSize / size - 1); + if (!useTwoLevelCompression) { + scale = 1; + } + + // publicCompression is how many centroids the user asked for + // compression is how many we actually keep + this.publicCompression = compression; + this.compression = Math.sqrt(scale) * publicCompression; + + // changing the compression could cause buffers to be too small, readjust if so + if (size < this.compression + sizeFudge) { + size = (int) Math.ceil(this.compression + sizeFudge); + } + + // ensure enough space in buffer (possibly again) + if (bufferSize <= 2 * size) { + bufferSize = 2 * size; + } + + weight = new double[size]; + mean = new double[size]; + + tempWeight = new double[bufferSize]; + tempMean = new double[bufferSize]; + order = new int[bufferSize]; + + lastUsedCell = 0; + } + + /** + * Turns on internal data recording. + */ + @Override + public TDigest recordAllData() { + super.recordAllData(); + data = new ArrayList<>(); + tempData = new ArrayList<>(); + return this; + } + + @Override + void add(double x, int w, Centroid base) { + add(x, w, base.data()); + } + + @Override + public void add(double x, int w) { + add(x, w, (List) null); + } + + private void add(double x, int w, List history) { + if (Double.isNaN(x)) { + throw new IllegalArgumentException("Cannot add NaN to t-digest"); + } + if (tempUsed >= tempWeight.length - lastUsedCell - 1) { + mergeNewValues(); + } + int where = tempUsed++; + tempWeight[where] = w; + tempMean[where] = x; + unmergedWeight += w; + if (x < min) { + min = x; + } + if (x > max) { + max = x; + } + + if (data != null) { + if (tempData == null) { + tempData = new ArrayList<>(); + } + while (tempData.size() <= where) { + tempData.add(new ArrayList()); + } + if (history == null) { + history = Collections.singletonList(x); + } + tempData.get(where).addAll(history); + } + } + + private void add(double[] m, double[] w, int count, List> data) { + if (m.length != w.length) { + throw new IllegalArgumentException("Arrays not same length"); + } + if (m.length < count + lastUsedCell) { + // make room to add existing centroids + double[] m1 = new double[count + lastUsedCell]; + System.arraycopy(m, 0, m1, 0, count); + m = m1; + double[] w1 = new double[count + lastUsedCell]; + System.arraycopy(w, 0, w1, 0, count); + w = w1; + } + double total = 0; + for (int i = 0; i < count; i++) { + total += w[i]; + } + merge(m, w, count, data, null, total, false, compression); + } + + @Override + public void add(List others) { + if (others.size() == 0) { + return; + } + int size = lastUsedCell; + for (TDigest other : others) { + other.compress(); + size += other.centroidCount(); + } + + double[] m = new double[size]; + double[] w = new double[size]; + List> data; + if (recordAllData) { + data = new ArrayList<>(); + } else { + data = null; + } + int offset = 0; + for (TDigest other : others) { + if (other instanceof MergingDigest) { + MergingDigest md = (MergingDigest) other; + System.arraycopy(md.mean, 0, m, offset, md.lastUsedCell); + System.arraycopy(md.weight, 0, w, offset, md.lastUsedCell); + if (data != null) { + for (Centroid centroid : other.centroids()) { + data.add(centroid.data()); + } + } + offset += md.lastUsedCell; + } else { + for (Centroid centroid : other.centroids()) { + m[offset] = centroid.mean(); + w[offset] = centroid.count(); + if (recordAllData) { + assert data != null; + data.add(centroid.data()); + } + offset++; + } + } + } + add(m, w, size, data); + } + + private void mergeNewValues() { + mergeNewValues(false, compression); + } + + private void mergeNewValues(boolean force, double compression) { + if (totalWeight == 0 && unmergedWeight == 0) { + // seriously nothing to do + return; + } + if (force || unmergedWeight > 0) { + // note that we run the merge in reverse every other merge to avoid left-to-right bias in merging + merge(tempMean, tempWeight, tempUsed, tempData, order, unmergedWeight, + useAlternatingSort & mergeCount % 2 == 1, compression); + mergeCount++; + tempUsed = 0; + unmergedWeight = 0; + if (data != null) { + tempData = new ArrayList<>(); + } + } + } + + private void merge(double[] incomingMean, double[] incomingWeight, int incomingCount, + List> incomingData, int[] incomingOrder, + double unmergedWeight, boolean runBackwards, double compression) { + System.arraycopy(mean, 0, incomingMean, incomingCount, lastUsedCell); + System.arraycopy(weight, 0, incomingWeight, incomingCount, lastUsedCell); + incomingCount += lastUsedCell; + + if (incomingData != null) { + for (int i = 0; i < lastUsedCell; i++) { + assert data != null; + incomingData.add(data.get(i)); + } + data = new ArrayList<>(); + } + if (incomingOrder == null) { + incomingOrder = new int[incomingCount]; + } + Sort.sort(incomingOrder, incomingMean, incomingCount); + // option to run backwards is to investigate bias in errors + if (runBackwards) { + Sort.reverse(incomingOrder, 0, incomingCount); + } + + totalWeight += unmergedWeight; + + assert (lastUsedCell + incomingCount) > 0; + lastUsedCell = 0; + mean[lastUsedCell] = incomingMean[incomingOrder[0]]; + weight[lastUsedCell] = incomingWeight[incomingOrder[0]]; + double wSoFar = 0; + if (data != null) { + assert incomingData != null; + data.add(incomingData.get(incomingOrder[0])); + } + + + // weight will contain all zeros after this loop + + double normalizer = scale.normalizer(compression, totalWeight); + double k1 = scale.k(0, normalizer); + double wLimit = totalWeight * scale.q(k1 + 1, normalizer); + for (int i = 1; i < incomingCount; i++) { + int ix = incomingOrder[i]; + double proposedWeight = weight[lastUsedCell] + incomingWeight[ix]; + double projectedW = wSoFar + proposedWeight; + boolean addThis; + if (useWeightLimit) { + double q0 = wSoFar / totalWeight; + double q2 = (wSoFar + proposedWeight) / totalWeight; + addThis = proposedWeight <= totalWeight * Math.min(scale.max(q0, normalizer), scale.max(q2, normalizer)); + } else { + addThis = projectedW <= wLimit; + } + + if (addThis) { + // next point will fit + // so merge into existing centroid + weight[lastUsedCell] += incomingWeight[ix]; + mean[lastUsedCell] = mean[lastUsedCell] + (incomingMean[ix] - mean[lastUsedCell]) * incomingWeight[ix] / weight[lastUsedCell]; + incomingWeight[ix] = 0; + + if (data != null) { + while (data.size() <= lastUsedCell) { + data.add(new ArrayList()); + } + assert incomingData != null; + assert data.get(lastUsedCell) != incomingData.get(ix); + data.get(lastUsedCell).addAll(incomingData.get(ix)); + } + } else { + // didn't fit ... move to next output, copy out first centroid + wSoFar += weight[lastUsedCell]; + if (!useWeightLimit) { + k1 = scale.k(wSoFar / totalWeight, normalizer); + wLimit = totalWeight * scale.q(k1 + 1, normalizer); + } + + lastUsedCell++; + mean[lastUsedCell] = incomingMean[ix]; + weight[lastUsedCell] = incomingWeight[ix]; + incomingWeight[ix] = 0; + + if (data != null) { + assert incomingData != null; + assert data.size() == lastUsedCell; + data.add(incomingData.get(ix)); + } + } + } + // points to next empty cell + lastUsedCell++; + + // sanity check + double sum = 0; + for (int i = 0; i < lastUsedCell; i++) { + sum += weight[i]; + } + assert sum == totalWeight; + if (runBackwards) { + Sort.reverse(mean, 0, lastUsedCell); + Sort.reverse(weight, 0, lastUsedCell); + if (data != null) { + Collections.reverse(data); + } + } + + if (totalWeight > 0) { + min = Math.min(min, mean[0]); + max = Math.max(max, mean[lastUsedCell - 1]); + } + } + + /** + * Exposed for testing. + */ + int checkWeights() { + return checkWeights(weight, totalWeight, lastUsedCell); + } + + private int checkWeights(double[] w, double total, int last) { + int badCount = 0; + + int n = last; + if (w[n] > 0) { + n++; + } + + double normalizer = scale.normalizer(publicCompression, totalWeight); + double k1 = scale.k(0, normalizer); + double q = 0; + double left = 0; + String header = "\n"; + for (int i = 0; i < n; i++) { + double dq = w[i] / total; + double k2 = scale.k(q + dq, normalizer); + q += dq / 2; + if (k2 - k1 > 1 && w[i] != 1) { + System.out.printf("%sOversize centroid at " + + "%d, k0=%.2f, k1=%.2f, dk=%.2f, w=%.2f, q=%.4f, dq=%.4f, left=%.1f, current=%.2f maxw=%.2f\n", + header, i, k1, k2, k2 - k1, w[i], q, dq, left, w[i], totalWeight * scale.max(q, normalizer)); + header = ""; + badCount++; + } + if (k2 - k1 > 4 && w[i] != 1) { + throw new IllegalStateException( + String.format("Egregiously oversized centroid at " + + "%d, k0=%.2f, k1=%.2f, dk=%.2f, w=%.2f, q=%.4f, dq=%.4f, left=%.1f, current=%.2f, maxw=%.2f\n", + i, k1, k2, k2 - k1, w[i], q, dq, left, w[i], totalWeight * scale.max(q, normalizer))); + } + q += dq / 2; + left += w[i]; + k1 = k2; + } + + return badCount; + } + + /** + * Merges any pending inputs and compresses the data down to the public setting. + * Note that this typically loses a bit of precision and thus isn't a thing to + * be doing all the time. It is best done only when we want to show results to + * the outside world. + */ + @Override + public void compress() { + mergeNewValues(true, publicCompression); + } + + @Override + public long size() { + return (long) (totalWeight + unmergedWeight); + } + + @Override + public double cdf(double x) { + mergeNewValues(); + + if (lastUsedCell == 0) { + // no data to examine + return Double.NaN; + } else if (lastUsedCell == 1) { + // exactly one centroid, should have max==min + double width = max - min; + if (x < min) { + return 0; + } else if (x > max) { + return 1; + } else if (x - min <= width) { + // min and max are too close together to do any viable interpolation + return 0.5; + } else { + // interpolate if somehow we have weight > 0 and max != min + return (x - min) / (max - min); + } + } else { + int n = lastUsedCell; + if (x < min) { + return 0; + } + + if (x > max) { + return 1; + } + + // check for the left tail + if (x < mean[0]) { + // note that this is different than mean[0] > min + // ... this guarantees we divide by non-zero number and interpolation works + if (mean[0] - min > 0) { + // must be a sample exactly at min + if (x == min) { + return 0.5 / totalWeight; + } else { + return (1 + (x - min) / (mean[0] - min) * (weight[0] / 2 - 1)) / totalWeight; + } + } else { + // this should be redundant with the check x < min + return 0; + } + } + assert x >= mean[0]; + + // and the right tail + if (x > mean[n - 1]) { + if (max - mean[n - 1] > 0) { + if (x == max) { + return 1 - 0.5 / totalWeight; + } else { + // there has to be a single sample exactly at max + double dq = (1 + (max - x) / (max - mean[n - 1]) * (weight[n - 1] / 2 - 1)) / totalWeight; + return 1 - dq; + } + } else { + return 1; + } + } + + // we know that there are at least two centroids and mean[0] < x < mean[n-1] + // that means that there are either one or more consecutive centroids all at exactly x + // or there are consecutive centroids, c0 < x < c1 + double weightSoFar = 0; + for (int it = 0; it < n - 1; it++) { + // weightSoFar does not include weight[it] yet + if (mean[it] == x) { + // we have one or more centroids == x, treat them as one + // dw will accumulate the weight of all of the centroids at x + double dw = 0; + while (it < n && mean[it] == x) { + dw += weight[it]; + it++; + } + return (weightSoFar + dw / 2) / totalWeight; + } else if (mean[it] <= x && x < mean[it + 1]) { + // landed between centroids ... check for floating point madness + if (mean[it + 1] - mean[it] > 0) { + // note how we handle singleton centroids here + // the point is that for singleton centroids, we know that their entire + // weight is exactly at the centroid and thus shouldn't be involved in + // interpolation + double leftExcludedW = 0; + double rightExcludedW = 0; + if (weight[it] == 1) { + if (weight[it + 1] == 1) { + // two singletons means no interpolation + // left singleton is in, right is out + return (weightSoFar + 1) / totalWeight; + } else { + leftExcludedW = 0.5; + } + } else if (weight[it + 1] == 1) { + rightExcludedW = 0.5; + } + double dw = (weight[it] + weight[it + 1]) / 2; + + // can't have double singleton (handled that earlier) + assert dw > 1; + assert (leftExcludedW + rightExcludedW) <= 0.5; + + // adjust endpoints for any singleton + double left = mean[it]; + double right = mean[it + 1]; + + double dwNoSingleton = dw - leftExcludedW - rightExcludedW; + + // adjustments have only limited effect on endpoints + assert dwNoSingleton > dw / 2; + assert right - left > 0; + double base = weightSoFar + weight[it] / 2 + leftExcludedW; + return (base + dwNoSingleton * (x - left) / (right - left)) / totalWeight; + } else { + // this is simply caution against floating point madness + // it is conceivable that the centroids will be different + // but too near to allow safe interpolation + double dw = (weight[it] + weight[it + 1]) / 2; + return (weightSoFar + dw) / totalWeight; + } + } else { + weightSoFar += weight[it]; + } + } + if (x == mean[n - 1]) { + return 1 - 0.5 / totalWeight; + } else { + throw new IllegalStateException("Can't happen ... loop fell through"); + } + } + } + + @Override + public double quantile(double q) { + if (q < 0 || q > 1) { + throw new IllegalArgumentException("q should be in [0,1], got " + q); + } + mergeNewValues(); + + if (lastUsedCell == 0) { + // no centroids means no data, no way to get a quantile + return Double.NaN; + } else if (lastUsedCell == 1) { + // with one data point, all quantiles lead to Rome + return mean[0]; + } + + // we know that there are at least two centroids now + int n = lastUsedCell; + + // if values were stored in a sorted array, index would be the offset we are interested in + final double index = q * totalWeight; + + // beyond the boundaries, we return min or max + // usually, the first centroid will have unit weight so this will make it moot + if (index < 1) { + return min; + } + + // if the left centroid has more than one sample, we still know + // that one sample occurred at min so we can do some interpolation + if (weight[0] > 1 && index < weight[0] / 2) { + // there is a single sample at min so we interpolate with less weight + return min + (index - 1) / (weight[0] / 2 - 1) * (mean[0] - min); + } + + // usually the last centroid will have unit weight so this test will make it moot + if (index > totalWeight - 1) { + return max; + } + + // if the right-most centroid has more than one sample, we still know + // that one sample occurred at max so we can do some interpolation + if (weight[n-1] > 1 && totalWeight - index <= weight[n - 1] / 2) { + return max - (totalWeight - index - 1) / (weight[n - 1] / 2 - 1) * (max - mean[n - 1]); + } + + // in between extremes we interpolate between centroids + double weightSoFar = weight[0] / 2; + for (int i = 0; i < n - 1; i++) { + double dw = (weight[i] + weight[i + 1]) / 2; + if (weightSoFar + dw > index) { + // centroids i and i+1 bracket our current point + + // check for unit weight + double leftUnit = 0; + if (weight[i] == 1) { + if (index - weightSoFar < 0.5) { + // within the singleton's sphere + return mean[i]; + } else { + leftUnit = 0.5; + } + } + double rightUnit = 0; + if (weight[i + 1] == 1) { + if (weightSoFar + dw - index <= 0.5) { + // no interpolation needed near singleton + return mean[i + 1]; + } + rightUnit = 0.5; + } + double z1 = index - weightSoFar - leftUnit; + double z2 = weightSoFar + dw - index - rightUnit; + return weightedAverage(mean[i], z2, mean[i + 1], z1); + } + weightSoFar += dw; + } + // we handled singleton at end up above + assert weight[n - 1] > 1; + assert index <= totalWeight; + assert index >= totalWeight - weight[n - 1] / 2; + + // weightSoFar = totalWeight - weight[n-1]/2 (very nearly) + // so we interpolate out to max value ever seen + double z1 = index - totalWeight - weight[n - 1] / 2.0; + double z2 = weight[n - 1] / 2 - z1; + return weightedAverage(mean[n - 1], z1, max, z2); + } + + @Override + public int centroidCount() { + mergeNewValues(); + return lastUsedCell; + } + + @Override + public Collection centroids() { + // we don't actually keep centroid structures around so we have to fake it + compress(); + return new AbstractCollection() { + @Override + public Iterator iterator() { + return new Iterator() { + int i = 0; + + @Override + public boolean hasNext() { + return i < lastUsedCell; + } + + @Override + public Centroid next() { + Centroid rc = new Centroid(mean[i], (int) weight[i], data != null ? data.get(i) : null); + i++; + return rc; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Default operation"); + } + }; + } + + @Override + public int size() { + return lastUsedCell; + } + }; + } + + @Override + public double compression() { + return publicCompression; + } + + @Override + public int byteSize() { + compress(); + // format code, compression(float), buffer-size(int), temp-size(int), #centroids-1(int), + // then two doubles per centroid + return lastUsedCell * 16 + 32; + } + + @Override + public int smallByteSize() { + compress(); + // format code(int), compression(float), buffer-size(short), temp-size(short), #centroids-1(short), + // then two floats per centroid + return lastUsedCell * 8 + 30; + } + + @SuppressWarnings("WeakerAccess") + public ScaleFunction getScaleFunction() { + return scale; + } + + public enum Encoding { + VERBOSE_ENCODING(1), SMALL_ENCODING(2); + + private final int code; + + Encoding(int code) { + this.code = code; + } + } + + @Override + public void asBytes(ByteBuffer buf) { + compress(); + buf.putInt(Encoding.VERBOSE_ENCODING.code); + buf.putDouble(min); + buf.putDouble(max); + buf.putDouble(publicCompression); + buf.putInt(lastUsedCell); + for (int i = 0; i < lastUsedCell; i++) { + buf.putDouble(weight[i]); + buf.putDouble(mean[i]); + } + } + + @Override + public void asSmallBytes(ByteBuffer buf) { + compress(); + buf.putInt(Encoding.SMALL_ENCODING.code); // 4 + buf.putDouble(min); // + 8 + buf.putDouble(max); // + 8 + buf.putFloat((float) publicCompression); // + 4 + buf.putShort((short) mean.length); // + 2 + buf.putShort((short) tempMean.length); // + 2 + buf.putShort((short) lastUsedCell); // + 2 = 30 + for (int i = 0; i < lastUsedCell; i++) { + buf.putFloat((float) weight[i]); + buf.putFloat((float) mean[i]); + } + } + + @SuppressWarnings("WeakerAccess") + public static MergingDigest fromBytes(ByteBuffer buf) { + int encoding = buf.getInt(); + if (encoding == Encoding.VERBOSE_ENCODING.code) { + double min = buf.getDouble(); + double max = buf.getDouble(); + double compression = buf.getDouble(); + int n = buf.getInt(); + MergingDigest r = new MergingDigest(compression); + r.setMinMax(min, max); + r.lastUsedCell = n; + for (int i = 0; i < n; i++) { + r.weight[i] = buf.getDouble(); + r.mean[i] = buf.getDouble(); + + r.totalWeight += r.weight[i]; + } + return r; + } else if (encoding == Encoding.SMALL_ENCODING.code) { + double min = buf.getDouble(); + double max = buf.getDouble(); + double compression = buf.getFloat(); + int n = buf.getShort(); + int bufferSize = buf.getShort(); + MergingDigest r = new MergingDigest(compression, bufferSize, n); + r.setMinMax(min, max); + r.lastUsedCell = buf.getShort(); + for (int i = 0; i < r.lastUsedCell; i++) { + r.weight[i] = buf.getFloat(); + r.mean[i] = buf.getFloat(); + + r.totalWeight += r.weight[i]; + } + return r; + } else { + throw new IllegalStateException("Invalid format for serialized histogram"); + } + + } + + @Override + public String toString() { + return "MergingDigest" + + "-" + getScaleFunction() + + "-" + (useWeightLimit ? "weight" : "kSize") + + "-" + (useAlternatingSort ? "alternating" : "stable") + + "-" + (useTwoLevelCompression ? "twoLevel" : "oneLevel"); + } +} \ No newline at end of file diff --git a/presto-main/src/main/java/com/facebook/presto/operator/scalar/ScaleFunction.java b/presto-main/src/main/java/com/facebook/presto/operator/scalar/ScaleFunction.java new file mode 100644 index 0000000000000..190cba8dab3e7 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/scalar/ScaleFunction.java @@ -0,0 +1,617 @@ +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.operator.scalar; + +/** + * Encodes the various scale functions for t-digests. These limits trade accuracy near the tails against accuracy near + * the median in different ways. For instance, K_0 has uniform cluster sizes and results in constant accuracy (in terms + * of q) while K_3 has cluster sizes proportional to min(q,1-q) which results in very much smaller error near the tails + * and modestly increased error near the median. + *

+ * The base forms (K_0, K_1, K_2 and K_3) all result in t-digests limited to a number of clusters equal to the + * compression factor. The K_2_NO_NORM and K_3_NO_NORM versions result in the cluster count increasing roughly with + * log(n). + */ +public enum ScaleFunction { + /** + * Generates uniform cluster sizes. Used for comparison only. + */ + K_0 { + @Override + public double k(double q, double compression, double n) { + return compression * q / 2; + } + + @Override + public double k(double q, double normalizer) { + return normalizer * q; + } + + @Override + public double q(double k, double compression, double n) { + return 2 * k / compression; + } + + @Override + public double q(double k, double normalizer) { + return k / normalizer; + } + + @Override + public double max(double q, double compression, double n) { + return 2 / compression; + } + + @Override + public double max(double q, double normalizer) { + return 1 / normalizer; + } + + @Override + public double normalizer(double compression, double n) { + return compression / 2; + } + }, + + /** + * Generates cluster sizes proportional to sqrt(q*(1-q)). This gives constant relative accuracy if accuracy is + * proportional to squared cluster size. It is expected that K_2 and K_3 will give better practical results. + */ + K_1 { + @Override + public double k(double q, double compression, double n) { + return compression * Math.asin(2 * q - 1) / (2 * Math.PI); + } + + @Override + public double k(double q, double normalizer) { + return normalizer * Math.asin(2 * q - 1); + } + + + @Override + public double q(double k, double compression, double n) { + return (Math.sin(k * (2 * Math.PI / compression)) + 1) / 2; + } + + @Override + public double q(double k, double normalizer) { + return (Math.sin(k / normalizer) + 1) / 2; + } + + @Override + public double max(double q, double compression, double n) { + if (q <= 0) { + return 0; + } else if (q >= 1) { + return 0; + } else { + return 2 * Math.sin(Math.PI / compression) * Math.sqrt(q * (1 - q)); + } + } + + @Override + public double max(double q, double normalizer) { + if (q <= 0) { + return 0; + } else if (q >= 1) { + return 0; + } else { + return 2 * Math.sin(0.5 / normalizer) * Math.sqrt(q * (1 - q)); + } + } + + @Override + public double normalizer(double compression, double n) { + return compression / (2 * Math.PI); + } + }, + + /** + * Generates cluster sizes proportional to sqrt(q*(1-q)) but avoids computation of asin in the critical path by + * using an approximate version. + */ + K_1_FAST { + @Override + public double k(double q, double compression, double n) { + return compression * fastAsin(2 * q - 1) / (2 * Math.PI); + } + + @Override + public double k(double q, double normalizer) { + return normalizer * fastAsin(2 * q - 1); + } + + @Override + public double q(double k, double compression, double n) { + return (Math.sin(k * (2 * Math.PI / compression)) + 1) / 2; + } + + @Override + public double q(double k, double normalizer) { + return (Math.sin(k / normalizer) + 1) / 2; + } + + @Override + public double max(double q, double compression, double n) { + if (q <= 0) { + return 0; + } else if (q >= 1) { + return 0; + } else { + return 2 * Math.sin(Math.PI / compression) * Math.sqrt(q * (1 - q)); + } + } + + @Override + public double max(double q, double normalizer) { + if (q <= 0) { + return 0; + } else if (q >= 1) { + return 0; + } else { + return 2 * Math.sin(0.5 / normalizer) * Math.sqrt(q * (1 - q)); + } + } + + @Override + public double normalizer(double compression, double n) { + return compression / (2 * Math.PI); + } + }, + + /** + * Generates cluster sizes proportional to q*(1-q). This makes tail error bounds tighter than for K_1. The use of a + * normalizing function results in a strictly bounded number of clusters no matter how many samples. + */ + K_2 { + @Override + public double k(double q, double compression, double n) { + if (n <= 1) { + if (q <= 0) { + return -10; + } else if (q >= 1) { + return 10; + } else { + return 0; + } + } + if (q == 0) { + return 2 * k(1 / n, compression, n); + } else if (q == 1) { + return 2 * k((n - 1) / n, compression, n); + } else { + return compression * Math.log(q / (1 - q)) / Z(compression, n); + } + } + + @Override + public double k(double q, double normalizer) { + if (q < 1e-15) { + // this will return something more extreme than q = 1/n + return 2 * k(1e-15, normalizer); + } else if (q > 1 - 1e-15) { + // this will return something more extreme than q = (n-1)/n + return 2 * k(1 - 1e-15, normalizer); + } else { + return Math.log(q / (1 - q)) * normalizer; + } + } + + @Override + public double q(double k, double compression, double n) { + double w = Math.exp(k * Z(compression, n) / compression); + return w / (1 + w); + } + + @Override + public double q(double k, double normalizer) { + double w = Math.exp(k / normalizer); + return w / (1 + w); + } + + @Override + public double max(double q, double compression, double n) { + return Z(compression, n) * q * (1 - q) / compression; + } + + @Override + public double max(double q, double normalizer) { + return q * (1 - q) / normalizer; + } + + @Override + public double normalizer(double compression, double n) { + return compression / Z(compression, n); + } + + private double Z(double compression, double n) { + return 4 * Math.log(n / compression) + 24; + } + }, + + /** + * Generates cluster sizes proportional to min(q, 1-q). This makes tail error bounds tighter than for K_1 or K_2. + * The use of a normalizing function results in a strictly bounded number of clusters no matter how many samples. + */ + K_3 { + @Override + public double k(double q, double compression, double n) { + if (q < 0.9 / n) { + return 10 * k(1 / n, compression, n); + } else if (q > 1 - 0.9 / n) { + return 10 * k((n - 1) / n, compression, n); + } else { + if (q <= 0.5) { + return compression * Math.log(2 * q) / Z(compression, n); + } else { + return -k(1 - q, compression, n); + } + } + } + + @Override + public double k(double q, double normalizer) { + if (q < 1e-15) { + return 10 * k(1e-15, normalizer); + } else if (q > 1 - 1e-15) { + return 10 * k(1 - 1e-15, normalizer); + } else { + if (q <= 0.5) { + return Math.log(2 * q) / normalizer; + } else { + return -k(1 - q, normalizer); + } + } + } + + @Override + public double q(double k, double compression, double n) { + if (k <= 0) { + return Math.exp(k * Z(compression, n) / compression) / 2; + } else { + return 1 - q(-k, compression, n); + } + } + + @Override + public double q(double k, double normalizer) { + if (k <= 0) { + return Math.exp(k / normalizer) / 2; + } else { + return 1 - q(-k, normalizer); + } + } + + @Override + public double max(double q, double compression, double n) { + return Z(compression, n) * Math.min(q, 1 - q) / compression; + } + + @Override + public double max(double q, double normalizer) { + return Math.min(q, 1 - q) / normalizer; + } + + @Override + public double normalizer(double compression, double n) { + return compression / Z(compression, n); + } + + private double Z(double compression, double n) { + return 4 * Math.log(n / compression) + 21; + } + }, + + /** + * Generates cluster sizes proportional to q*(1-q). This makes the tail error bounds tighter. This version does not + * use a normalizer function and thus the number of clusters increases roughly proportional to log(n). That is good + * for accuracy, but bad for size and bad for the statically allocated MergingDigest, but can be useful for + * tree-based implementations. + */ + K_2_NO_NORM { + @Override + public double k(double q, double compression, double n) { + if (q == 0) { + return 2 * k(1 / n, compression, n); + } else if (q == 1) { + return 2 * k((n - 1) / n, compression, n); + } else { + return compression * Math.log(q / (1 - q)); + } + } + + @Override + public double k(double q, double normalizer) { + if (q <= 1e-15) { + return 2 * k(1e-15, normalizer); + } else if (q >= 1 - 1e-15) { + return 2 * k(1 - 1e-15, normalizer); + } else { + return normalizer * Math.log(q / (1 - q)); + } + } + + @Override + public double q(double k, double compression, double n) { + double w = Math.exp(k / compression); + return w / (1 + w); + } + + @Override + public double q(double k, double normalizer) { + double w = Math.exp(k / normalizer); + return w / (1 + w); + } + + @Override + public double max(double q, double compression, double n) { + return q * (1 - q) / compression; + } + + @Override + public double max(double q, double normalizer) { + return q * (1 - q) / normalizer; + } + + @Override + public double normalizer(double compression, double n) { + return compression; + } + }, + + /** + * Generates cluster sizes proportional to min(q, 1-q). This makes the tail error bounds tighter. This version does + * not use a normalizer function and thus the number of clusters increases roughly proportional to log(n). That is + * good for accuracy, but bad for size and bad for the statically allocated MergingDigest, but can be useful for + * tree-based implementations. + */ + K_3_NO_NORM { + @Override + public double k(double q, double compression, double n) { + if (q < 0.9 / n) { + return 10 * k(1 / n, compression, n); + } else if (q > 1 - 0.9 / n) { + return 10 * k((n - 1) / n, compression, n); + } else { + if (q <= 0.5) { + return compression * Math.log(2 * q); + } else { + return -k(1 - q, compression, n); + } + } + } + + @Override + public double k(double q, double normalizer) { + if (q <= 1e-15) { + return 10 * k(1e-15, normalizer); + } else if (q > 1 - 1e-15) { + return 10 * k(1 - 1e-15, normalizer); + } else { + if (q <= 0.5) { + return normalizer * Math.log(2 * q); + } else { + return -k(1 - q, normalizer); + } + } + } + + @Override + public double q(double k, double compression, double n) { + if (k <= 0) { + return Math.exp(k / compression) / 2; + } else { + return 1 - q(-k, compression, n); + } + } + + @Override + public double q(double k, double normalizer) { + if (k <= 0) { + return Math.exp(k / normalizer) / 2; + } else { + return 1 - q(-k, normalizer); + } + } + + @Override + public double max(double q, double compression, double n) { + return Math.min(q, 1 - q) / compression; + } + + @Override + public double max(double q, double normalizer) { + return Math.min(q, 1 - q) / normalizer; + } + + @Override + public double normalizer(double compression, double n) { + return compression; + } + }; // max weight is min(q,1-q), should improve tail accuracy even more + + /** + * Converts a quantile to the k-scale. The total number of points is also provided so that a normalizing function + * can be computed if necessary. + * + * @param q The quantile + * @param compression Also known as delta in literature on the t-digest + * @param n The total number of samples + * @return The corresponding value of k + */ + abstract public double k(double q, double compression, double n); + + /** + * Converts a quantile to the k-scale. The normalizer value depends on compression and (possibly) number of points + * in the digest. #normalizer(double, double) + * + * @param q The quantile + * @param normalizer The normalizer value which depends on compression and (possibly) number of points in the + * digest. + * @return The corresponding value of k + */ + abstract public double k(double q, double normalizer); + + /** + * Computes q as a function of k. This is often faster than finding k as a function of q for some scales. + * + * @param k The index value to convert into q scale. + * @param compression The compression factor (often written as δ) + * @param n The number of samples already in the digest. + * @return The value of q that corresponds to k + */ + abstract public double q(double k, double compression, double n); + + /** + * Computes q as a function of k. This is often faster than finding k as a function of q for some scales. + * + * @param k The index value to convert into q scale. + * @param normalizer The normalizer value which depends on compression and (possibly) number of points in the + * digest. + * @return The value of q that corresponds to k + */ + abstract public double q(double k, double normalizer); + + /** + * Computes the maximum relative size a cluster can have at quantile q. Note that exactly where within the range + * spanned by a cluster that q should be isn't clear. That means that this function usually has to be taken at + * multiple points and the smallest value used. + *

+ * Note that this is the relative size of a cluster. To get the max number of samples in the cluster, multiply this + * value times the total number of samples in the digest. + * + * @param q The quantile + * @param compression The compression factor, typically delta in the literature + * @param n The number of samples seen so far in the digest + * @return The maximum number of samples that can be in the cluster + */ + abstract public double max(double q, double compression, double n); + + /** + * Computes the maximum relative size a cluster can have at quantile q. Note that exactly where within the range + * spanned by a cluster that q should be isn't clear. That means that this function usually has to be taken at + * multiple points and the smallest value used. + *

+ * Note that this is the relative size of a cluster. To get the max number of samples in the cluster, multiply this + * value times the total number of samples in the digest. + * + * @param q The quantile + * @param normalizer The normalizer value which depends on compression and (possibly) number of points in the + * digest. + * @return The maximum number of samples that can be in the cluster + */ + abstract public double max(double q, double normalizer); + + /** + * Computes the normalizer given compression and number of points. + */ + abstract public double normalizer(double compression, double n); + + /** + * Approximates asin to within about 1e-6. This approximation works by breaking the range from 0 to 1 into 5 regions + * for all but the region nearest 1, rational polynomial models get us a very good approximation of asin and by + * interpolating as we move from region to region, we can guarantee continuity and we happen to get monotonicity as + * well. for the values near 1, we just use Math.asin as our region "approximation". + * + * @param x sin(theta) + * @return theta + */ + static double fastAsin(double x) { + if (x < 0) { + return -fastAsin(-x); + } else if (x > 1) { + return Double.NaN; + } else { + // Cutoffs for models. Note that the ranges overlap. In the + // overlap we do linear interpolation to guarantee the overall + // result is "nice" + double c0High = 0.1; + double c1High = 0.55; + double c2Low = 0.5; + double c2High = 0.8; + double c3Low = 0.75; + double c3High = 0.9; + double c4Low = 0.87; + if (x > c3High) { + return Math.asin(x); + } else { + // the models + double[] m0 = {0.2955302411, 1.2221903614, 0.1488583743, 0.2422015816, -0.3688700895, 0.0733398445}; + double[] m1 = {-0.0430991920, 0.9594035750, -0.0362312299, 0.1204623351, 0.0457029620, -0.0026025285}; + double[] m2 = {-0.034873933724, 1.054796752703, -0.194127063385, 0.283963735636, 0.023800124916, -0.000872727381}; + double[] m3 = {-0.37588391875, 2.61991859025, -2.48835406886, 1.48605387425, 0.00857627492, -0.00015802871}; + + // the parameters for all of the models + double[] vars = {1, x, x * x, x * x * x, 1 / (1 - x), 1 / (1 - x) / (1 - x)}; + + // raw grist for interpolation coefficients + double x0 = bound((c0High - x) / c0High); + double x1 = bound((c1High - x) / (c1High - c2Low)); + double x2 = bound((c2High - x) / (c2High - c3Low)); + double x3 = bound((c3High - x) / (c3High - c4Low)); + + // interpolation coefficients + //noinspection UnnecessaryLocalVariable + double mix0 = x0; + double mix1 = (1 - x0) * x1; + double mix2 = (1 - x1) * x2; + double mix3 = (1 - x2) * x3; + double mix4 = 1 - x3; + + // now mix all the results together, avoiding extra evaluations + double r = 0; + if (mix0 > 0) { + r += mix0 * eval(m0, vars); + } + if (mix1 > 0) { + r += mix1 * eval(m1, vars); + } + if (mix2 > 0) { + r += mix2 * eval(m2, vars); + } + if (mix3 > 0) { + r += mix3 * eval(m3, vars); + } + if (mix4 > 0) { + // model 4 is just the real deal + r += mix4 * Math.asin(x); + } + return r; + } + } + } + + private static double eval(double[] model, double[] vars) { + double r = 0; + for (int i = 0; i < model.length; i++) { + r += model[i] * vars[i]; + } + return r; + } + + private static double bound(double v) { + if (v <= 0) { + return 0; + } else if (v >= 1) { + return 1; + } else { + return v; + } + } +} \ No newline at end of file diff --git a/presto-main/src/main/java/com/facebook/presto/operator/scalar/Sort.java b/presto-main/src/main/java/com/facebook/presto/operator/scalar/Sort.java new file mode 100644 index 0000000000000..602c98430a55c --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/scalar/Sort.java @@ -0,0 +1,482 @@ +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.operator.scalar; + +import java.util.Random; + +/** + * Static sorting methods + */ +public class Sort { + private static final Random prng = new Random(); // for choosing pivots during quicksort + /** + * Quick sort using an index array. On return, + * values[order[i]] is in order as i goes 0..values.length + * + * @param order Indexes into values + * @param values The values to sort. + */ + @SuppressWarnings("WeakerAccess") + public static void sort(int[] order, double[] values) { + sort(order, values, 0, values.length); + } + + /** + * Quick sort using an index array. On return, + * values[order[i]] is in order as i goes 0..n + * + * @param order Indexes into values + * @param values The values to sort. + * @param n The number of values to sort + */ + @SuppressWarnings("WeakerAccess") + public static void sort(int[] order, double[] values, int n) { + sort(order, values, 0, n); + } + + /** + * Quick sort using an index array. On return, + * values[order[i]] is in order as i goes start..n + * + * @param order Indexes into values + * @param values The values to sort. + * @param start The first element to sort + * @param n The number of values to sort + */ + @SuppressWarnings("WeakerAccess") + public static void sort(int[] order, double[] values, int start, int n) { + for (int i = start; i < start + n; i++) { + order[i] = i; + } + quickSort(order, values, start, start + n, 64); + insertionSort(order, values, start, start + n, 64); + } + + /** + * Standard quick sort except that sorting is done on an index array rather than the values themselves + * + * @param order The pre-allocated index array + * @param values The values to sort + * @param start The beginning of the values to sort + * @param end The value after the last value to sort + * @param limit The minimum size to recurse down to. + */ + private static void quickSort(int[] order, double[] values, int start, int end, int limit) { + // the while loop implements tail-recursion to avoid excessive stack calls on nasty cases + while (end - start > limit) { + + // pivot by a random element + int pivotIndex = start + prng.nextInt(end - start); + double pivotValue = values[order[pivotIndex]]; + + // move pivot to beginning of array + swap(order, start, pivotIndex); + + // we use a three way partition because many duplicate values is an important case + + int low = start + 1; // low points to first value not known to be equal to pivotValue + int high = end; // high points to first value > pivotValue + int i = low; // i scans the array + while (i < high) { + // invariant: values[order[k]] == pivotValue for k in [0..low) + // invariant: values[order[k]] < pivotValue for k in [low..i) + // invariant: values[order[k]] > pivotValue for k in [high..end) + // in-loop: i < high + // in-loop: low < high + // in-loop: i >= low + double vi = values[order[i]]; + if (vi == pivotValue) { + if (low != i) { + swap(order, low, i); + } else { + i++; + } + low++; + } else if (vi > pivotValue) { + high--; + swap(order, i, high); + } else { + // vi < pivotValue + i++; + } + } + // invariant: values[order[k]] == pivotValue for k in [0..low) + // invariant: values[order[k]] < pivotValue for k in [low..i) + // invariant: values[order[k]] > pivotValue for k in [high..end) + // assert i == high || low == high therefore, we are done with partition + + // at this point, i==high, from [start,low) are == pivot, [low,high) are < and [high,end) are > + // we have to move the values equal to the pivot into the middle. To do this, we swap pivot + // values into the top end of the [low,high) range stopping when we run out of destinations + // or when we run out of values to copy + int from = start; + int to = high - 1; + for (i = 0; from < low && to >= low; i++) { + swap(order, from++, to--); + } + if (from == low) { + // ran out of things to copy. This means that the the last destination is the boundary + low = to + 1; + } else { + // ran out of places to copy to. This means that there are uncopied pivots and the + // boundary is at the beginning of those + low = from; + } + +// checkPartition(order, values, pivotValue, start, low, high, end); + + // now recurse, but arrange it so we handle the longer limit by tail recursion + if (low - start < end - high) { + quickSort(order, values, start, low, limit); + + // this is really a way to do + // quickSort(order, values, high, end, limit); + start = high; + } else { + quickSort(order, values, high, end, limit); + // this is really a way to do + // quickSort(order, values, start, low, limit); + end = low; + } + } + } + + /** + * Quick sort in place of several paired arrays. On return, + * keys[...] is in order and the values[] arrays will be + * reordered as well in the same way. + * + * @param key Values to sort on + * @param values The auxilliary values to sort. + */ + @SuppressWarnings("WeakerAccess") + public static void sort(double[] key, double[] ... values) { + sort(key, 0, key.length, values); + } + + /** + * Quick sort using an index array. On return, + * values[order[i]] is in order as i goes start..n + * @param key Values to sort on + * @param start The first element to sort + * @param n The number of values to sort + * @param values The auxilliary values to sort. + */ + @SuppressWarnings("WeakerAccess") + public static void sort(double[] key, int start, int n, double[]... values) { + quickSort(key, values, start, start + n, 8); + insertionSort(key, values, start, start + n, 8); + } + + /** + * Standard quick sort except that sorting rearranges parallel arrays + * + * @param key Values to sort on + * @param values The auxilliary values to sort. + * @param start The beginning of the values to sort + * @param end The value after the last value to sort + * @param limit The minimum size to recurse down to. + */ + private static void quickSort(double[] key, double[][] values, int start, int end, int limit) { + // the while loop implements tail-recursion to avoid excessive stack calls on nasty cases + while (end - start > limit) { + + // median of three values for the pivot + int a = start; + int b = (start + end) / 2; + int c = end - 1; + + int pivotIndex; + double pivotValue; + double va = key[a]; + double vb = key[b]; + double vc = key[c]; + //noinspection Duplicates + if (va > vb) { + if (vc > va) { + // vc > va > vb + pivotIndex = a; + pivotValue = va; + } else { + // va > vb, va >= vc + if (vc < vb) { + // va > vb > vc + pivotIndex = b; + pivotValue = vb; + } else { + // va >= vc >= vb + pivotIndex = c; + pivotValue = vc; + } + } + } else { + // vb >= va + if (vc > vb) { + // vc > vb >= va + pivotIndex = b; + pivotValue = vb; + } else { + // vb >= va, vb >= vc + if (vc < va) { + // vb >= va > vc + pivotIndex = a; + pivotValue = va; + } else { + // vb >= vc >= va + pivotIndex = c; + pivotValue = vc; + } + } + } + + // move pivot to beginning of array + swap(start, pivotIndex, key, values); + + // we use a three way partition because many duplicate values is an important case + + int low = start + 1; // low points to first value not known to be equal to pivotValue + int high = end; // high points to first value > pivotValue + int i = low; // i scans the array + while (i < high) { + // invariant: values[order[k]] == pivotValue for k in [0..low) + // invariant: values[order[k]] < pivotValue for k in [low..i) + // invariant: values[order[k]] > pivotValue for k in [high..end) + // in-loop: i < high + // in-loop: low < high + // in-loop: i >= low + double vi = key[i]; + if (vi == pivotValue) { + if (low != i) { + swap(low, i, key, values); + } else { + i++; + } + low++; + } else if (vi > pivotValue) { + high--; + swap(i, high, key, values); + } else { + // vi < pivotValue + i++; + } + } + // invariant: values[order[k]] == pivotValue for k in [0..low) + // invariant: values[order[k]] < pivotValue for k in [low..i) + // invariant: values[order[k]] > pivotValue for k in [high..end) + // assert i == high || low == high therefore, we are done with partition + + // at this point, i==high, from [start,low) are == pivot, [low,high) are < and [high,end) are > + // we have to move the values equal to the pivot into the middle. To do this, we swap pivot + // values into the top end of the [low,high) range stopping when we run out of destinations + // or when we run out of values to copy + int from = start; + int to = high - 1; + for (i = 0; from < low && to >= low; i++) { + swap(from++, to--, key, values); + } + if (from == low) { + // ran out of things to copy. This means that the the last destination is the boundary + low = to + 1; + } else { + // ran out of places to copy to. This means that there are uncopied pivots and the + // boundary is at the beginning of those + low = from; + } + +// checkPartition(order, values, pivotValue, start, low, high, end); + + // now recurse, but arrange it so we handle the longer limit by tail recursion + if (low - start < end - high) { + quickSort(key, values, start, low, limit); + + // this is really a way to do + // quickSort(order, values, high, end, limit); + start = high; + } else { + quickSort(key, values, high, end, limit); + // this is really a way to do + // quickSort(order, values, start, low, limit); + end = low; + } + } + } + + + /** + * Limited range insertion sort. We assume that no element has to move more than limit steps + * because quick sort has done its thing. This version works on parallel arrays of keys and values. + * + * @param key The array of keys + * @param values The values we are sorting + * @param start The starting point of the sort + * @param end The ending point of the sort + * @param limit The largest amount of disorder + */ + @SuppressWarnings("SameParameterValue") + private static void insertionSort(double[] key, double[][] values, int start, int end, int limit) { + // loop invariant: all values start ... i-1 are ordered + for (int i = start + 1; i < end; i++) { + double v = key[i]; + int m = Math.max(i - limit, start); + for (int j = i; j >= m; j--) { + if (j == m || key[j - 1] <= v) { + if (j < i) { + System.arraycopy(key, j, key, j + 1, i - j); + key[j] = v; + for (double[] value : values) { + double tmp = value[i]; + System.arraycopy(value, j, value, j + 1, i - j); + value[j] = tmp; + } + } + break; + } + } + } + } + + private static void swap(int[] order, int i, int j) { + int t = order[i]; + order[i] = order[j]; + order[j] = t; + } + + private static void swap(int i, int j, double[] key, double[]...values) { + double t = key[i]; + key[i] = key[j]; + key[j] = t; + + for (int k = 0; k < values.length; k++) { + t = values[k][i]; + values[k][i] = values[k][j]; + values[k][j] = t; + } + } + + /** + * Check that a partition step was done correctly. For debugging and testing. + * + * @param order The array of indexes representing a permutation of the keys. + * @param values The keys to sort. + * @param pivotValue The value that splits the data + * @param start The beginning of the data of interest. + * @param low Values from start (inclusive) to low (exclusive) are < pivotValue. + * @param high Values from low to high are equal to the pivot. + * @param end Values from high to end are above the pivot. + */ + @SuppressWarnings("UnusedDeclaration") + public static void checkPartition(int[] order, double[] values, double pivotValue, int start, int low, int high, int end) { + if (order.length != values.length) { + throw new IllegalArgumentException("Arguments must be same size"); + } + + if (!(start >= 0 && low >= start && high >= low && end >= high)) { + throw new IllegalArgumentException(String.format("Invalid indices %d, %d, %d, %d", start, low, high, end)); + } + + for (int i = 0; i < low; i++) { + double v = values[order[i]]; + if (v >= pivotValue) { + throw new IllegalArgumentException(String.format("Value greater than pivot at %d", i)); + } + } + + for (int i = low; i < high; i++) { + if (values[order[i]] != pivotValue) { + throw new IllegalArgumentException(String.format("Non-pivot at %d", i)); + } + } + + for (int i = high; i < end; i++) { + double v = values[order[i]]; + if (v <= pivotValue) { + throw new IllegalArgumentException(String.format("Value less than pivot at %d", i)); + } + } + } + + /** + * Limited range insertion sort. We assume that no element has to move more than limit steps + * because quick sort has done its thing. + * + * @param order The permutation index + * @param values The values we are sorting + * @param start Where to start the sort + * @param n How many elements to sort + * @param limit The largest amount of disorder + */ + @SuppressWarnings("SameParameterValue") + private static void insertionSort(int[] order, double[] values, int start, int n, int limit) { + for (int i = start + 1; i < n; i++) { + int t = order[i]; + double v = values[order[i]]; + int m = Math.max(i - limit, start); + for (int j = i; j >= m; j--) { + if (j == 0 || values[order[j - 1]] <= v) { + if (j < i) { + System.arraycopy(order, j, order, j + 1, i - j); + order[j] = t; + } + break; + } + } + } + } + + /** + * Reverses an array in-place. + * + * @param order The array to reverse + */ + @SuppressWarnings("WeakerAccess") + public static void reverse(int[] order) { + reverse(order, 0, order.length); + } + + /** + * Reverses part of an array. See {@link #reverse(int[])} + * + * @param order The array containing the data to reverse. + * @param offset Where to start reversing. + * @param length How many elements to reverse + */ + @SuppressWarnings("WeakerAccess") + public static void reverse(int[] order, int offset, int length) { + for (int i = 0; i < length / 2; i++) { + int t = order[offset + i]; + order[offset + i] = order[offset + length - i - 1]; + order[offset + length - i - 1] = t; + } + } + + /** + * Reverses part of an array. See {@link #reverse(int[])} + * + * @param order The array containing the data to reverse. + * @param offset Where to start reversing. + * @param length How many elements to reverse + */ + @SuppressWarnings({"WeakerAccess", "SameParameterValue"}) + public static void reverse(double[] order, int offset, int length) { + for (int i = 0; i < length / 2; i++) { + double t = order[offset + i]; + order[offset + i] = order[offset + length - i - 1]; + order[offset + length - i - 1] = t; + } + } +} \ No newline at end of file diff --git a/presto-main/src/main/java/com/facebook/presto/operator/scalar/TDigest.java b/presto-main/src/main/java/com/facebook/presto/operator/scalar/TDigest.java new file mode 100644 index 0000000000000..ec41b52329a5e --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/operator/scalar/TDigest.java @@ -0,0 +1,226 @@ +/* + * Licensed to Ted Dunning under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.operator.scalar; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +/** + * Adaptive histogram based on something like streaming k-means crossed with Q-digest. + * + * The special characteristics of this algorithm are: + * + * - smaller summaries than Q-digest + * + * - works on doubles as well as integers. + * + * - provides part per million accuracy for extreme quantiles and typically <1000 ppm accuracy for middle quantiles + * + * - fast + * + * - simple + * + * - test coverage roughly at 90% + * + * - easy to adapt for use with map-reduce + */ +public abstract class TDigest implements Serializable { + protected ScaleFunction scale = ScaleFunction.K_2; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + + /** + * Creates an {@link MergingDigest}. This is generally the best known implementation right now. + * + * @param compression The compression parameter. 100 is a common value for normal uses. 1000 is extremely large. + * The number of centroids retained will be a smallish (usually less than 10) multiple of this number. + * @return the MergingDigest + */ + @SuppressWarnings("WeakerAccess") + public static TDigest createMergingDigest(double compression) { + return new MergingDigest(compression); + } + + /** + * Creates a TDigest of whichever type is the currently recommended type. MergingDigest is generally the best + * known implementation right now. + * + * @param compression The compression parameter. 100 is a common value for normal uses. 1000 is extremely large. + * The number of centroids retained will be a smallish (usually less than 10) multiple of this number. + * @return the TDigest + */ + @SuppressWarnings({"unused", "WeakerAccess", "SameParameterValue"}) + public static TDigest createDigest(double compression) { + return createMergingDigest(compression); + } + + /** + * Adds a sample to a histogram. + * + * @param x The value to add. + * @param w The weight of this point. + */ + public abstract void add(double x, int w); + + final void checkValue(double x) { + if (Double.isNaN(x)) { + throw new IllegalArgumentException("Cannot add NaN"); + } + } + + public abstract void add(List others); + + /** + * Re-examines a t-digest to determine whether some centroids are redundant. If your data are + * perversely ordered, this may be a good idea. Even if not, this may save 20% or so in space. + * + * The cost is roughly the same as adding as many data points as there are centroids. This + * is typically < 10 * compression, but could be as high as 100 * compression. + * + * This is a destructive operation that is not thread-safe. + */ + public abstract void compress(); + + /** + * Returns the number of points that have been added to this TDigest. + * + * @return The sum of the weights on all centroids. + */ + public abstract long size(); + + /** + * Returns the fraction of all points added which are ≤ x. + * + * @param x The cutoff for the cdf. + * @return The fraction of all data which is less or equal to x. + */ + public abstract double cdf(double x); + + /** + * Returns an estimate of the cutoff such that a specified fraction of the data + * added to this TDigest would be less than or equal to the cutoff. + * + * @param q The desired fraction + * @return The value x such that cdf(x) == q + */ + public abstract double quantile(double q); + + /** + * A {@link Collection} that lets you go through the centroids in ascending order by mean. Centroids + * returned will not be re-used, but may or may not share storage with this TDigest. + * + * @return The centroids in the form of a Collection. + */ + public abstract Collection centroids(); + + /** + * Returns the current compression factor. + * + * @return The compression factor originally used to set up the TDigest. + */ + public abstract double compression(); + + /** + * Returns the number of bytes required to encode this TDigest using #asBytes(). + * + * @return The number of bytes required. + */ + public abstract int byteSize(); + + /** + * Returns the number of bytes required to encode this TDigest using #asSmallBytes(). + * + * Note that this is just as expensive as actually compressing the digest. If you don't + * care about time, but want to never over-allocate, this is fine. If you care about compression + * and speed, you pretty much just have to overallocate by using allocating #byteSize() bytes. + * + * @return The number of bytes required. + */ + public abstract int smallByteSize(); + + public void setScaleFunction(ScaleFunction scaleFunction) { + if (scaleFunction.toString().endsWith("NO_NORM")) { + throw new IllegalArgumentException( + String.format("Can't use %s as scale with %s", scaleFunction, this.getClass())); + } + this.scale = scaleFunction; + } + + /** + * Serialize this TDigest into a byte buffer. Note that the serialization used is + * very straightforward and is considerably larger than strictly necessary. + * + * @param buf The byte buffer into which the TDigest should be serialized. + */ + public abstract void asBytes(ByteBuffer buf); + + /** + * Serialize this TDigest into a byte buffer. Some simple compression is used + * such as using variable byte representation to store the centroid weights and + * using delta-encoding on the centroid means so that floats can be reasonably + * used to store the centroid means. + * + * @param buf The byte buffer into which the TDigest should be serialized. + */ + public abstract void asSmallBytes(ByteBuffer buf); + + /** + * Tell this TDigest to record the original data as much as possible for test + * purposes. + * + * @return This TDigest so that configurations can be done in fluent style. + */ + public abstract TDigest recordAllData(); + + public abstract boolean isRecording(); + + /** + * Add a sample to this TDigest. + * + * @param x The data value to add + */ + public abstract void add(double x); + + /** + * Add all of the centroids of another TDigest to this one. + * + * @param other The other TDigest + */ + public abstract void add(TDigest other); + + public abstract int centroidCount(); + + public double getMin() { + return min; + } + + public double getMax() { + return max; + } + + /** + * Over-ride the min and max values for testing purposes + */ + @SuppressWarnings("SameParameterValue") + void setMinMax(double min, double max) { + this.min = min; + this.max = max; + } +} \ No newline at end of file diff --git a/presto-main/src/test/java/com/facebook/presto/operator/scalar/BenchmarkTDigest.java b/presto-main/src/test/java/com/facebook/presto/operator/scalar/BenchmarkTDigest.java new file mode 100644 index 0000000000000..318f89f884a58 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/operator/scalar/BenchmarkTDigest.java @@ -0,0 +1,246 @@ +package com.facebook.presto.operator.scalar; + +import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.VerboseMode; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(2) +@Warmup(iterations = 5) +@Measurement(iterations = 10) +public class BenchmarkTDigest +{ + private static final int NUMBER_OF_ENTRIES = 1_000_000; + private static final int STANDARD_COMPRESSION_FACTOR = 100; + private static final double STANDARD_MAX_ERROR = 0.01; + + @State(Scope.Thread) + public static class Data + { + private long[] normalDistribution1; + private long[] normalDistribution2; + private long[] randomDistribution1; + private long[] randomDistribution2; + + @Setup + public void setup() + { + normalDistribution1 = makeNormalValues(NUMBER_OF_ENTRIES); + normalDistribution2 = makeNormalValues(NUMBER_OF_ENTRIES); + randomDistribution1 = makeRandomValues(NUMBER_OF_ENTRIES); + randomDistribution2 = makeRandomValues(NUMBER_OF_ENTRIES); + } + + private long[] makeNormalValues(int size) + { + long[] values = new long[size]; + for (int i = 0; i < size; i++) { + // generate values from a large domain but not many distinct values + long value = Math.abs((long) (ThreadLocalRandom.current().nextGaussian() * 1_000_000_000)); + values[i] = value; + } + + return values; + } + + private long[] makeRandomValues(int size) + { + long[] values = new long[size]; + for (int i = 0; i < size; i++) { + values[i] = (long) (Math.random() * 1_000_000_000); + } + + return values; + } + } + + @State(Scope.Thread) + public static class Digest + { + protected TDigest digest1; + protected TDigest digest2; + protected QuantileDigest digest3; + protected QuantileDigest digest4; + protected Slice serializedDigest1; + protected Slice serializedDigest3; + + @Setup + public void setup(Data data) + { + digest1 = makeTDigest(data.normalDistribution1); + digest2 = makeTDigest(data.randomDistribution1); + digest3 = makeQDigest(data.normalDistribution1); + digest4 = makeQDigest(data.randomDistribution1); + serializedDigest1 = digest1.serialize(); + serializedDigest3 = digest3.serialize(); + } + + private TDigest makeTDigest(long[] values) + { + TDigest result = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + int k = 1_000_000 / NUMBER_OF_ENTRIES; + for (int i = 0; i < k; i++) { + for (long value : values) { + result.add(value); + } + } + return result; + } + + private QuantileDigest makeQDigest(long[] values) + { + QuantileDigest result = new QuantileDigest(STANDARD_MAX_ERROR); + for (long value : values) { + result.add(value); + } + return result; + } + } + + @State(Scope.Thread) + public static class DigestWithQuantile + extends Digest + { + @Param({"0.0001", "0.01", "0.2500", "0.5000", "0.7500", "0.9999"}) + float quantile; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OperationsPerInvocation(NUMBER_OF_ENTRIES) + public TDigest benchmarkInsertsT(Data data) + { + TDigest digest = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + int k = 1_000_000 / NUMBER_OF_ENTRIES; + for (int i = 0; i < k; i++) { + for (long value : data.normalDistribution1) { + digest.add(value); + } + } + + return digest; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OperationsPerInvocation(NUMBER_OF_ENTRIES) + public QuantileDigest benchmarkInsertsQ(Data data) + { + QuantileDigest digest = new QuantileDigest(STANDARD_MAX_ERROR); + + for (int i = 0; i < 10; i++) { + for (long value : data.normalDistribution1) { + digest.add(value); + } + } + return digest; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public double benchmarkQuantilesT(DigestWithQuantile data) + { + return data.digest1.quantile(data.quantile); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public long benchmarkQuantilesQ(DigestWithQuantile data) + { + return data.digest3.getQuantile(data.quantile); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public TDigest benchmarkCopyT(Digest data) + { + TDigest copy = TDigest.createMergingDigest(data.digest1.compression()); + copy.add(ImmutableList.of(data.digest1)); + return copy; + //alternatively + //return MergingDigest.deserialize(data.digest1.serialize()); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public QuantileDigest benchmarkCopyQ(Digest data) + { + return new QuantileDigest(data.digest3); + } + + @Benchmark @BenchmarkMode(Mode.AverageTime) + public TDigest benchmarkMergeT(Digest data) + { + TDigest merged = MergingDigest.deserialize(data.digest1.serialize()); + merged.add(data.digest2); + return merged; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public QuantileDigest benchmarkMergeQ(Digest data) + { + QuantileDigest merged = new QuantileDigest(data.digest3); + merged.merge(data.digest4); + return merged; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public TDigest benchmarkDeserializeT(Digest data) + { + return MergingDigest.deserialize(data.serializedDigest1); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public QuantileDigest benchmarkDeserializeQ(Digest data) + { + return new QuantileDigest(data.serializedDigest3); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public Slice benchmarkSerializeT(Digest data) + { + return data.digest1.serialize(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + public Slice benchmarkSerializeQ(Digest data) + { + return data.digest3.serialize(); + } + + public static void main(String[] args) + throws RunnerException + { + Options options = new OptionsBuilder() + .verbosity(VerboseMode.NORMAL) + .include(".*" + BenchmarkTDigest.class.getSimpleName() + ".*") + //.addProfiler(GCProfiler.class) + .build(); + + new Runner(options).run(); + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/operator/scalar/QuantileDigest.java b/presto-main/src/test/java/com/facebook/presto/operator/scalar/QuantileDigest.java new file mode 100644 index 0000000000000..1681582351538 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/operator/scalar/QuantileDigest.java @@ -0,0 +1,1162 @@ +package com.facebook.presto.operator.scalar; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.collect.Iterators; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.Ordering; +import com.google.common.collect.PeekingIterator; +import com.google.common.util.concurrent.AtomicDouble; +import io.airlift.slice.BasicSliceInput; +import io.airlift.slice.DynamicSliceOutput; +import io.airlift.slice.SizeOf; +import io.airlift.slice.Slice; +import io.airlift.slice.SliceInput; +import io.airlift.slice.SliceOutput; +import org.openjdk.jol.info.ClassLayout; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/* + This is a copy of the same class in the Airlift library, with the addition of a method to calculate + the truncated mean. This is intended to be temporary, as we are working on merging in changes to the + public class to enable us to write the truncated mean method outside of the class body: + https://github.com/airlift/airlift/issues/710 + */ +@NotThreadSafe +public class QuantileDigest +{ + private static final int MAX_BITS = 64; + private static final int QUANTILE_DIGEST_SIZE = ClassLayout.parseClass(QuantileDigest.class).instanceSize(); + static final long RESCALE_THRESHOLD_SECONDS = 50L; + static final double ZERO_WEIGHT_THRESHOLD = 1.0E-5D; + private static final int INITIAL_CAPACITY = 1; + private final double maxError; + private final Ticker ticker; + private final double alpha; + private long landmarkInSeconds; + private double weightedCount; + private long max; + private long min; + private int root; + private int nextNode; + private double[] counts; + private byte[] levels; + private long[] values; + private int[] lefts; + private int[] rights; + private int freeCount; + private int firstFree; + + public QuantileDigest(double maxError) + { + this(maxError, 0.0D); + } + + public QuantileDigest(double maxError, double alpha) + { + this(maxError, alpha, alpha == 0.0D ? noOpTicker() : Ticker.systemTicker()); + } + + @VisibleForTesting + QuantileDigest(double maxError, double alpha, Ticker ticker) + { + this.max = -9223372036854775808L; + this.min = 9223372036854775807L; + this.root = -1; + this.firstFree = -1; + Preconditions.checkArgument(maxError >= 0.0D && maxError <= 1.0D, "maxError must be in range [0, 1]"); + Preconditions.checkArgument(alpha >= 0.0D && alpha < 1.0D, "alpha must be in range [0, 1)"); + this.maxError = maxError; + this.alpha = alpha; + this.ticker = ticker; + this.landmarkInSeconds = TimeUnit.NANOSECONDS.toSeconds(ticker.read()); + this.counts = new double[1]; + this.levels = new byte[1]; + this.values = new long[1]; + this.lefts = new int[1]; + this.rights = new int[1]; + Arrays.fill(this.lefts, -1); + Arrays.fill(this.rights, -1); + } + + public QuantileDigest(QuantileDigest other) + { + this.max = -9223372036854775808L; + this.min = 9223372036854775807L; + this.root = -1; + this.firstFree = -1; + this.maxError = other.maxError; + this.alpha = other.alpha; + this.ticker = this.alpha == 0.0D ? noOpTicker() : Ticker.systemTicker(); + this.landmarkInSeconds = other.landmarkInSeconds; + this.weightedCount = other.weightedCount; + this.max = other.max; + this.min = other.min; + this.root = other.root; + this.nextNode = other.nextNode; + this.counts = (double[]) other.counts.clone(); + this.levels = (byte[]) other.levels.clone(); + this.values = (long[]) other.values.clone(); + this.lefts = (int[]) other.lefts.clone(); + this.rights = (int[]) other.rights.clone(); + this.freeCount = other.freeCount; + this.firstFree = other.firstFree; + } + + public QuantileDigest(Slice serialized) + { + this.max = -9223372036854775808L; + this.min = 9223372036854775807L; + this.root = -1; + this.firstFree = -1; + SliceInput input = new BasicSliceInput(serialized); + byte format = input.readByte(); + Preconditions.checkArgument(format == 0, "Invalid format"); + this.maxError = input.readDouble(); + this.alpha = input.readDouble(); + if (this.alpha == 0.0D) { + this.ticker = noOpTicker(); + } + else { + this.ticker = Ticker.systemTicker(); + } + + this.landmarkInSeconds = input.readLong(); + this.min = input.readLong(); + this.max = input.readLong(); + int nodeCount = input.readInt(); + int numberOfLevels = 64 - Long.numberOfLeadingZeros(this.min ^ this.max) + 1; + double k = (double) (3 * numberOfLevels) / this.maxError; + Preconditions.checkArgument((double) nodeCount <= 2.0D * k, "Too many nodes in deserialized tree. Possible corruption"); + this.counts = new double[nodeCount]; + this.levels = new byte[nodeCount]; + this.values = new long[nodeCount]; + int[] stack = new int[(Integer.highestOneBit(nodeCount - 1) << 1) + 1]; + int top = -1; + this.lefts = new int[nodeCount]; + this.rights = new int[nodeCount]; + + for (int node = 0; node < nodeCount; ++node) { + byte nodeStructure = input.readByte(); + boolean hasRight = (nodeStructure & 2) != 0; + boolean hasLeft = (nodeStructure & 1) != 0; + byte level = (byte) (nodeStructure >>> 2 & 63); + if (hasLeft || hasRight) { + ++level; + } + + this.levels[node] = level; + if (hasRight) { + this.rights[node] = stack[top--]; + } + else { + this.rights[node] = -1; + } + + if (hasLeft) { + this.lefts[node] = stack[top--]; + } + else { + this.lefts[node] = -1; + } + + ++top; + stack[top] = node; + double count = input.readDouble(); + this.weightedCount += count; + this.counts[node] = count; + this.values[node] = input.readLong(); + } + + Preconditions.checkArgument(nodeCount == 0 || top == 0, "Tree is corrupted. Expected a single root node"); + this.root = nodeCount - 1; + this.nextNode = nodeCount; + } + + public double getMaxError() + { + return this.maxError; + } + + public double getAlpha() + { + return this.alpha; + } + + public void add(long value) + { + this.add(value, 1L); + } + + public void add(long value, long count) + { + Preconditions.checkArgument(count > 0L, "count must be > 0"); + boolean needsCompression = false; + double weight = (double) count; + if (this.alpha > 0.0D) { + long nowInSeconds = TimeUnit.NANOSECONDS.toSeconds(this.ticker.read()); + if (nowInSeconds - this.landmarkInSeconds >= 50L) { + this.rescale(nowInSeconds); + needsCompression = true; + } + + weight = this.weight(nowInSeconds) * (double) count; + } + + this.max = Math.max(this.max, value); + this.min = Math.min(this.min, value); + double previousCount = this.weightedCount; + this.insert(longToBits(value), weight); + int compressionFactor = this.calculateCompressionFactor(); + if (needsCompression || (long) previousCount / (long) compressionFactor != (long) this.weightedCount / (long) compressionFactor) { + this.compress(); + } + } + + public void merge(QuantileDigest other) + { + this.rescaleToCommonLandmark(this, other); + this.root = this.merge(this.root, other, other.root); + this.max = Math.max(this.max, other.max); + this.min = Math.min(this.min, other.min); + this.compress(); + } + + public List getQuantilesLowerBound(List quantiles) + { + Preconditions.checkArgument(Ordering.natural().isOrdered(quantiles), "quantiles must be sorted in increasing order"); + Iterator var2 = quantiles.iterator(); + + while (var2.hasNext()) { + double quantile = (Double) var2.next(); + Preconditions.checkArgument(quantile >= 0.0D && quantile <= 1.0D, "quantile must be between [0,1]"); + } + + List reversedQuantiles = ImmutableList.copyOf(quantiles).reverse(); + final Builder builder = ImmutableList.builder(); + final PeekingIterator iterator = Iterators.peekingIterator(reversedQuantiles.iterator()); + this.postOrderTraversal(this.root, new QuantileDigest.Callback() + { + private double sum; + + public boolean process(int node) + { + this.sum += QuantileDigest.this.counts[node]; + + while (iterator.hasNext() && this.sum > (1.0D - (Double) iterator.peek()) * QuantileDigest.this.weightedCount) { + iterator.next(); + long value = Math.max(QuantileDigest.this.lowerBound(node), QuantileDigest.this.min); + builder.add(value); + } + + return iterator.hasNext(); + } + }, QuantileDigest.TraversalOrder.REVERSE); + + while (iterator.hasNext()) { + builder.add(this.min); + iterator.next(); + } + + return builder.build().reverse(); + } + + public List getQuantilesUpperBound(List quantiles) + { + Preconditions.checkArgument(Ordering.natural().isOrdered(quantiles), "quantiles must be sorted in increasing order"); + Iterator var2 = quantiles.iterator(); + + while (var2.hasNext()) { + double quantile = (Double) var2.next(); + Preconditions.checkArgument(quantile >= 0.0D && quantile <= 1.0D, "quantile must be between [0,1]"); + } + + final Builder builder = ImmutableList.builder(); + final PeekingIterator iterator = Iterators.peekingIterator(quantiles.iterator()); + this.postOrderTraversal(this.root, new QuantileDigest.Callback() + { + private double sum; + + public boolean process(int node) + { + this.sum += QuantileDigest.this.counts[node]; + + while (iterator.hasNext() && this.sum > (Double) iterator.peek() * QuantileDigest.this.weightedCount) { + iterator.next(); + long value = Math.min(QuantileDigest.this.upperBound(node), QuantileDigest.this.max); + builder.add(value); + } + + return iterator.hasNext(); + } + }); + + while (iterator.hasNext()) { + builder.add(this.max); + iterator.next(); + } + + return builder.build(); + } + + public List getQuantiles(List quantiles) + { + return this.getQuantilesUpperBound(quantiles); + } + + public long getQuantile(double quantile) + { + return (Long) this.getQuantiles(ImmutableList.of(quantile)).get(0); + } + + public long getQuantileLowerBound(double quantile) + { + return (Long) this.getQuantilesLowerBound(ImmutableList.of(quantile)).get(0); + } + + public long getQuantileUpperBound(double quantile) + { + return (Long) this.getQuantilesUpperBound(ImmutableList.of(quantile)).get(0); + } + + public double getCount() + { + return this.weightedCount / this.weight(TimeUnit.NANOSECONDS.toSeconds(this.ticker.read())); + } + + public List getHistogram(List bucketUpperBounds) + { + return this.getHistogram(bucketUpperBounds, QuantileDigest.MiddleFunction.DEFAULT); + } + + public List getHistogram(List bucketUpperBounds, QuantileDigest.MiddleFunction middleFunction) + { + Preconditions.checkArgument(Ordering.natural().isOrdered(bucketUpperBounds), "buckets must be sorted in increasing order"); + Builder builder = ImmutableList.builder(); + PeekingIterator iterator = Iterators.peekingIterator(bucketUpperBounds.iterator()); + QuantileDigest.HistogramBuilderStateHolder holder = new QuantileDigest.HistogramBuilderStateHolder(); + double normalizationFactor = this.weight(TimeUnit.NANOSECONDS.toSeconds(this.ticker.read())); + this.postOrderTraversal(this.root, (node) -> { + while (iterator.hasNext() && (Long) iterator.peek() <= this.upperBound(node)) { + double bucketCount = holder.sum - holder.lastSum; + QuantileDigest.Bucket bucket = new QuantileDigest.Bucket(bucketCount / normalizationFactor, holder.bucketWeightedSum / bucketCount); + builder.add(bucket); + holder.lastSum = holder.sum; + holder.bucketWeightedSum = 0.0D; + iterator.next(); + } + + holder.bucketWeightedSum += middleFunction.middle(this.lowerBound(node), this.upperBound(node)) * this.counts[node]; + holder.sum += this.counts[node]; + return iterator.hasNext(); + }); + + while (iterator.hasNext()) { + double bucketCount = holder.sum - holder.lastSum; + QuantileDigest.Bucket bucket = new QuantileDigest.Bucket(bucketCount / normalizationFactor, holder.bucketWeightedSum / bucketCount); + builder.add(bucket); + iterator.next(); + } + + return builder.build(); + } + + public long getMin() + { + AtomicLong chosen = new AtomicLong(this.min); + this.postOrderTraversal(this.root, (node) -> { + if (this.counts[node] >= 1.0E-5D) { + chosen.set(this.lowerBound(node)); + return false; + } + else { + return true; + } + }, QuantileDigest.TraversalOrder.FORWARD); + return Math.max(this.min, chosen.get()); + } + + public long getMax() + { + AtomicLong chosen = new AtomicLong(this.max); + this.postOrderTraversal(this.root, (node) -> { + if (this.counts[node] >= 1.0E-5D) { + chosen.set(this.upperBound(node)); + return false; + } + else { + return true; + } + }, QuantileDigest.TraversalOrder.REVERSE); + return Math.min(this.max, chosen.get()); + } + + /** + * Get the approx truncated mean from the digest. + * The mean is computed as a weighted average of values between the upper and lower quantiles (inclusive) + * When the rank of a quantile is non-integer, a portion of the nearest rank's value is included in the mean + * according to its fraction within the quantile bound. + *

+ * If the value in the digest needs to be converted before the mean is calculated, + * valueFunction can be used. Else, pass in the identity function. + */ + public Double getTruncatedMean(double lowerQuantile, double upperQuantile, TruncMeanValueFunction valueFunction) + { + if (weightedCount == 0 || lowerQuantile >= upperQuantile) { + return null; + } + + AtomicDouble meanResult = new AtomicDouble(); + double lowerRank = lowerQuantile * weightedCount; + double upperRank = upperQuantile * weightedCount; + + postOrderTraversal(root, new Callback() + { + private double sum; + private double count; + private double mean; + + public boolean process(int node) + { + double nodeCount = counts[node]; + if (nodeCount == 0) { + return true; + } + + sum += nodeCount; + + double amountOverLower = Math.max(sum - lowerRank, 0); + double amountOverUpper = Math.max(sum - upperRank, 0); + // Constrain node count to the rank of the lower and upper bound quantiles + nodeCount = Math.max(0, Math.min(Math.min(nodeCount, amountOverLower), nodeCount - amountOverUpper)); + + if (amountOverLower > 0) { + double value = valueFunction.value(Math.min(upperBound(node), max)); + mean = (mean * count + nodeCount * value) / (count + nodeCount); + count += nodeCount; + } + + if (amountOverUpper > 0 || sum == weightedCount) { + meanResult.set(mean); + return false; + } + return true; + } + }); + + return meanResult.get(); + } + + public long estimatedInMemorySizeInBytes() + { + return ((long) QUANTILE_DIGEST_SIZE + SizeOf.sizeOf(this.counts) + SizeOf.sizeOf(this.levels) + SizeOf.sizeOf(this.values) + SizeOf.sizeOf(this.lefts) + SizeOf.sizeOf(this.rights)); + } + + public int estimatedSerializedSizeInBytes() + { + int nodeSize = 17; + return 45 + this.getNodeCount() * nodeSize; + } + + public Slice serialize() + { + this.compress(); + SliceOutput output = new DynamicSliceOutput(this.estimatedSerializedSizeInBytes()); + output.writeByte(0); + output.writeDouble(this.maxError); + output.writeDouble(this.alpha); + output.writeLong(this.landmarkInSeconds); + output.writeLong(this.min); + output.writeLong(this.max); + output.writeInt(this.getNodeCount()); + final int[] nodes = new int[this.getNodeCount()]; + this.postOrderTraversal(this.root, new QuantileDigest.Callback() + { + int index; + + public boolean process(int node) + { + nodes[this.index++] = node; + return true; + } + }); + int[] var3 = nodes; + int var4 = nodes.length; + + for (int var5 = 0; var5 < var4; ++var5) { + int node = var3[var5]; + byte nodeStructure = (byte) (Math.max(this.levels[node] - 1, 0) << 2); + if (this.lefts[node] != -1) { + nodeStructure = (byte) (nodeStructure | 1); + } + + if (this.rights[node] != -1) { + nodeStructure = (byte) (nodeStructure | 2); + } + + output.writeByte(nodeStructure); + output.writeDouble(this.counts[node]); + output.writeLong(this.values[node]); + } + + return output.slice(); + } + + @VisibleForTesting + int getNodeCount() + { + return this.nextNode - this.freeCount; + } + + @VisibleForTesting + void compress() + { + double bound = Math.floor(this.weightedCount / (double) this.calculateCompressionFactor()); + this.postOrderTraversal(this.root, (node) -> { + int left = this.lefts[node]; + int right = this.rights[node]; + if (left == -1 && right == -1) { + return true; + } + else { + double leftCount = left == -1 ? 0.0D : this.counts[left]; + double rightCount = right == -1 ? 0.0D : this.counts[right]; + boolean shouldCompress = this.counts[node] + leftCount + rightCount < bound; + double[] var10000; + if (left != -1 && (shouldCompress || leftCount < 1.0E-5D)) { + this.lefts[node] = this.tryRemove(left); + var10000 = this.counts; + var10000[node] += leftCount; + } + + if (right != -1 && (shouldCompress || rightCount < 1.0E-5D)) { + this.rights[node] = this.tryRemove(right); + var10000 = this.counts; + var10000[node] += rightCount; + } + + return true; + } + }); + if (this.root != -1 && this.counts[this.root] < 1.0E-5D) { + this.root = this.tryRemove(this.root); + } + } + + private double weight(long timestamp) + { + return Math.exp(this.alpha * (double) (timestamp - this.landmarkInSeconds)); + } + + private void rescale(long newLandmarkInSeconds) + { + double factor = Math.exp(-this.alpha * (double) (newLandmarkInSeconds - this.landmarkInSeconds)); + this.weightedCount *= factor; + + for (int i = 0; i < this.nextNode; ++i) { + double[] var10000 = this.counts; + var10000[i] *= factor; + } + + this.landmarkInSeconds = newLandmarkInSeconds; + } + + private int calculateCompressionFactor() + { + return this.root == -1 ? 1 : Math.max((int) ((double) (this.levels[this.root] + 1) / this.maxError), 1); + } + + private void insert(long value, double count) + { + if (count >= 1.0E-5D) { + long lastBranch = 0L; + int parent = -1; + int current = this.root; + + while (current != -1) { + long currentValue = this.values[current]; + byte currentLevel = this.levels[current]; + if (!inSameSubtree(value, currentValue, currentLevel)) { + this.setChild(parent, lastBranch, this.makeSiblings(current, this.createLeaf(value, count))); + return; + } + + if (currentLevel == 0 && currentValue == value) { + double[] var10000 = this.counts; + var10000[current] += count; + this.weightedCount += count; + return; + } + + long branch = value & this.getBranchMask(currentLevel); + parent = current; + lastBranch = branch; + if (branch == 0L) { + current = this.lefts[current]; + } + else { + current = this.rights[current]; + } + } + + this.setChild(parent, lastBranch, this.createLeaf(value, count)); + } + } + + private void setChild(int parent, long branch, int child) + { + if (parent == -1) { + this.root = child; + } + else if (branch == 0L) { + this.lefts[parent] = child; + } + else { + this.rights[parent] = child; + } + } + + private int makeSiblings(int first, int second) + { + long firstValue = this.values[first]; + long secondValue = this.values[second]; + int parentLevel = 64 - Long.numberOfLeadingZeros(firstValue ^ secondValue); + int parent = this.createNode(firstValue, parentLevel, 0.0D); + long branch = firstValue & this.getBranchMask(this.levels[parent]); + if (branch == 0L) { + this.lefts[parent] = first; + this.rights[parent] = second; + } + else { + this.lefts[parent] = second; + this.rights[parent] = first; + } + + return parent; + } + + private int createLeaf(long value, double count) + { + return this.createNode(value, 0, count); + } + + private int createNode(long value, int level, double count) + { + int node = this.popFree(); + if (node == -1) { + if (this.nextNode == this.counts.length) { + int newSize = this.counts.length + Math.min(this.counts.length, this.calculateCompressionFactor() / 5 + 1); + this.counts = Arrays.copyOf(this.counts, newSize); + this.levels = Arrays.copyOf(this.levels, newSize); + this.values = Arrays.copyOf(this.values, newSize); + this.lefts = Arrays.copyOf(this.lefts, newSize); + this.rights = Arrays.copyOf(this.rights, newSize); + } + + node = this.nextNode++; + } + + this.weightedCount += count; + this.values[node] = value; + this.levels[node] = (byte) level; + this.counts[node] = count; + this.lefts[node] = -1; + this.rights[node] = -1; + return node; + } + + private int merge(int node, QuantileDigest other, int otherNode) + { + if (otherNode == -1) { + return node; + } + else if (node == -1) { + return this.copyRecursive(other, otherNode); + } + else if (!inSameSubtree(this.values[node], other.values[otherNode], Math.max(this.levels[node], other.levels[otherNode]))) { + return this.makeSiblings(node, this.copyRecursive(other, otherNode)); + } + else { + int left; + long branch; + if (this.levels[node] > other.levels[otherNode]) { + branch = other.values[otherNode] & this.getBranchMask(this.levels[node]); + if (branch == 0L) { + left = this.merge(this.lefts[node], other, otherNode); + this.lefts[node] = left; + } + else { + left = this.merge(this.rights[node], other, otherNode); + this.rights[node] = left; + } + + return node; + } + else if (this.levels[node] < other.levels[otherNode]) { + branch = this.values[node] & this.getBranchMask(other.levels[otherNode]); + int right; + if (branch == 0L) { + left = this.merge(node, other, other.lefts[otherNode]); + right = this.copyRecursive(other, other.rights[otherNode]); + } + else { + left = this.copyRecursive(other, other.lefts[otherNode]); + right = this.merge(node, other, other.rights[otherNode]); + } + + int result = this.createNode(other.values[otherNode], other.levels[otherNode], other.counts[otherNode]); + this.lefts[result] = left; + this.rights[result] = right; + return result; + } + else { + this.weightedCount += other.counts[otherNode]; + double[] var10000 = this.counts; + var10000[node] += other.counts[otherNode]; + left = this.merge(this.lefts[node], other, other.lefts[otherNode]); + int right = this.merge(this.rights[node], other, other.rights[otherNode]); + this.lefts[node] = left; + this.rights[node] = right; + return node; + } + } + } + + private static boolean inSameSubtree(long bitsA, long bitsB, int level) + { + return level == 64 || bitsA >>> level == bitsB >>> level; + } + + private int copyRecursive(QuantileDigest other, int otherNode) + { + if (otherNode == -1) { + return otherNode; + } + else { + int node = this.createNode(other.values[otherNode], other.levels[otherNode], other.counts[otherNode]); + int right; + if (other.lefts[otherNode] != -1) { + right = this.copyRecursive(other, other.lefts[otherNode]); + this.lefts[node] = right; + } + + if (other.rights[otherNode] != -1) { + right = this.copyRecursive(other, other.rights[otherNode]); + this.rights[node] = right; + } + + return node; + } + } + + private int tryRemove(int node) + { + Preconditions.checkArgument(node != -1, "node is -1"); + int left = this.lefts[node]; + int right = this.rights[node]; + if (left == -1 && right == -1) { + this.remove(node); + return -1; + } + else if (left != -1 && right != -1) { + this.counts[node] = 0.0D; + return node; + } + else { + this.remove(node); + return left != -1 ? left : right; + } + } + + private void remove(int node) + { + if (node == this.nextNode - 1) { + --this.nextNode; + } + else { + this.pushFree(node); + } + + if (node == this.root) { + this.root = -1; + } + } + + private void pushFree(int node) + { + this.lefts[node] = this.firstFree; + this.firstFree = node; + ++this.freeCount; + } + + private int popFree() + { + int node = this.firstFree; + if (node == -1) { + return node; + } + else { + this.firstFree = this.lefts[this.firstFree]; + --this.freeCount; + return node; + } + } + + private void postOrderTraversal(int node, QuantileDigest.Callback callback) + { + this.postOrderTraversal(node, callback, QuantileDigest.TraversalOrder.FORWARD); + } + + private void postOrderTraversal(int node, QuantileDigest.Callback callback, QuantileDigest.TraversalOrder order) + { + if (order == QuantileDigest.TraversalOrder.FORWARD) { + this.postOrderTraversal(node, callback, this.lefts, this.rights); + } + else { + this.postOrderTraversal(node, callback, this.rights, this.lefts); + } + } + + private boolean postOrderTraversal(int node, QuantileDigest.Callback callback, int[] lefts, int[] rights) + { + if (node == -1) { + return false; + } + else { + int first = lefts[node]; + int second = rights[node]; + if (first != -1 && !this.postOrderTraversal(first, callback, lefts, rights)) { + return false; + } + else { + return second != -1 && !this.postOrderTraversal(second, callback, lefts, rights) ? false : callback.process(node); + } + } + } + + public double getConfidenceFactor() + { + return this.computeMaxPathWeight(this.root) * 1.0D / this.weightedCount; + } + + @VisibleForTesting + boolean equivalent(QuantileDigest other) + { + return this.getNodeCount() == other.getNodeCount() && this.min == other.min && this.max == other.max && this.weightedCount == other.weightedCount && this.alpha == other.alpha; + } + + private void rescaleToCommonLandmark(QuantileDigest one, QuantileDigest two) + { + long nowInSeconds = TimeUnit.NANOSECONDS.toSeconds(this.ticker.read()); + long targetLandmark = Math.max(one.landmarkInSeconds, two.landmarkInSeconds); + if (nowInSeconds - targetLandmark >= 50L) { + targetLandmark = nowInSeconds; + } + + if (targetLandmark != one.landmarkInSeconds) { + one.rescale(targetLandmark); + } + + if (targetLandmark != two.landmarkInSeconds) { + two.rescale(targetLandmark); + } + } + + private double computeMaxPathWeight(int node) + { + if (node != -1 && this.levels[node] != 0) { + double leftMaxWeight = this.computeMaxPathWeight(this.lefts[node]); + double rightMaxWeight = this.computeMaxPathWeight(this.rights[node]); + return Math.max(leftMaxWeight, rightMaxWeight) + this.counts[node]; + } + else { + return 0.0D; + } + } + + @VisibleForTesting + void validate() + { + AtomicDouble sum = new AtomicDouble(); + AtomicInteger nodeCount = new AtomicInteger(); + Set freeSlots = this.computeFreeList(); + Preconditions.checkState(freeSlots.size() == this.freeCount, "Free count (%s) doesn't match actual free slots: %s", this.freeCount, freeSlots.size()); + if (this.root != -1) { + this.validateStructure(this.root, freeSlots); + this.postOrderTraversal(this.root, (node) -> { + sum.addAndGet(this.counts[node]); + nodeCount.incrementAndGet(); + return true; + }); + } + + Preconditions.checkState(Math.abs(sum.get() - this.weightedCount) < 1.0E-5D, "Computed weight (%s) doesn't match summary (%s)", sum.get(), this.weightedCount); + Preconditions.checkState(nodeCount.get() == this.getNodeCount(), "Actual node count (%s) doesn't match summary (%s)", nodeCount.get(), this.getNodeCount()); + } + + private void validateStructure(int node, Set freeNodes) + { + Preconditions.checkState(this.levels[node] >= 0); + Preconditions.checkState(!freeNodes.contains(node), "Node is in list of free slots: %s", node); + if (this.lefts[node] != -1) { + this.validateBranchStructure(node, this.lefts[node], this.rights[node], true); + this.validateStructure(this.lefts[node], freeNodes); + } + + if (this.rights[node] != -1) { + this.validateBranchStructure(node, this.rights[node], this.lefts[node], false); + this.validateStructure(this.rights[node], freeNodes); + } + } + + private void validateBranchStructure(int parent, int child, int otherChild, boolean isLeft) + { + Preconditions.checkState(this.levels[child] < this.levels[parent], "Child level (%s) should be smaller than parent level (%s)", this.levels[child], this.levels[parent]); + long branch = this.values[child] & 1L << this.levels[parent] - 1; + Preconditions.checkState(branch == 0L && isLeft || branch != 0L && !isLeft, "Value of child node is inconsistent with its branch"); + Preconditions.checkState(this.counts[parent] > 0.0D || this.counts[child] > 0.0D || otherChild != -1, "Found a linear chain of zero-weight nodes"); + } + + private Set computeFreeList() + { + Set freeSlots = new HashSet(); + + for (int index = this.firstFree; index != -1; index = this.lefts[index]) { + freeSlots.add(index); + } + + return freeSlots; + } + + public String toGraphviz() + { + StringBuilder builder = new StringBuilder(); + builder.append("digraph QuantileDigest {\n").append("\tgraph [ordering=\"out\"];"); + List nodes = new ArrayList(); + this.postOrderTraversal(this.root, (nodex) -> { + nodes.add(nodex); + return true; + }); + Multimap nodesByLevel = Multimaps.index(nodes, (input) -> { + return this.levels[input]; + }); + Iterator var4 = nodesByLevel.asMap().entrySet().iterator(); + + while (var4.hasNext()) { + Entry> entry = (Entry) var4.next(); + builder.append("\tsubgraph level_" + entry.getKey() + " {\n").append("\t\trank = same;\n"); + Iterator var6 = ((Collection) entry.getValue()).iterator(); + + while (var6.hasNext()) { + int node = (Integer) var6.next(); + if (this.levels[node] == 0) { + builder.append(String.format("\t\t%s [label=\"%s:[%s]@%s\\n%s\", shape=rect, style=filled,color=%s];\n", idFor(node), node, this.lowerBound(node), this.levels[node], this.counts[node], this.counts[node] > 0.0D ? "salmon2" : "white")); + } + else { + builder.append(String.format("\t\t%s [label=\"%s:[%s..%s]@%s\\n%s\", shape=rect, style=filled,color=%s];\n", idFor(node), node, this.lowerBound(node), this.upperBound(node), this.levels[node], this.counts[node], this.counts[node] > 0.0D ? "salmon2" : "white")); + } + } + + builder.append("\t}\n"); + } + + var4 = nodes.iterator(); + + while (var4.hasNext()) { + int node = (Integer) var4.next(); + if (this.lefts[node] != -1) { + builder.append(String.format("\t%s -> %s [style=\"%s\"];\n", idFor(node), idFor(this.lefts[node]), this.levels[node] - this.levels[this.lefts[node]] == 1 ? "solid" : "dotted")); + } + + if (this.rights[node] != -1) { + builder.append(String.format("\t%s -> %s [style=\"%s\"];\n", idFor(node), idFor(this.rights[node]), this.levels[node] - this.levels[this.rights[node]] == 1 ? "solid" : "dotted")); + } + } + + builder.append("}\n"); + return builder.toString(); + } + + private static String idFor(int node) + { + return String.format("node_%x", node); + } + + private static long longToBits(long value) + { + return value ^ -9223372036854775808L; + } + + private static long bitsToLong(long bits) + { + return bits ^ -9223372036854775808L; + } + + private long getBranchMask(byte level) + { + return 1L << level - 1; + } + + private long upperBound(int node) + { + long mask = 0L; + if (this.levels[node] > 0) { + mask = -1L >>> 64 - this.levels[node]; + } + + return bitsToLong(this.values[node] | mask); + } + + private long lowerBound(int node) + { + long mask = 0L; + if (this.levels[node] > 0) { + mask = -1L >>> 64 - this.levels[node]; + } + + return bitsToLong(this.values[node] & ~mask); + } + + private long middle(int node) + { + long lower = this.lowerBound(node); + long upper = this.upperBound(node); + return lower + (upper - lower) / 2L; + } + + private static Ticker noOpTicker() + { + return new Ticker() + { + public long read() + { + return 0L; + } + }; + } + + public interface MiddleFunction + { + QuantileDigest.MiddleFunction DEFAULT = (lowerBound, upperBound) -> { + return (double) lowerBound + (double) (upperBound - lowerBound) / 2.0D; + }; + + double middle(long lowerBound, long upperBound); + } + + public interface TruncMeanValueFunction + { + QuantileDigest.TruncMeanValueFunction DEFAULT = value -> value; + + double value(long value); + } + + private static class Flags + { + public static final int HAS_LEFT = 1; + public static final int HAS_RIGHT = 2; + public static final byte FORMAT = 0; + + private Flags() + { + } + } + + private interface Callback + { + boolean process(int node); + } + + public static class Bucket + { + private double count; + private double mean; + + public Bucket(double count, double mean) + { + this.count = count; + this.mean = mean; + } + + public double getCount() + { + return this.count; + } + + public double getMean() + { + return this.mean; + } + + public boolean equals(Object o) + { + if (this == o) { + return true; + } + else if (o != null && this.getClass() == o.getClass()) { + QuantileDigest.Bucket bucket = (QuantileDigest.Bucket) o; + if (Double.compare(bucket.count, this.count) != 0) { + return false; + } + else { + return Double.compare(bucket.mean, this.mean) == 0; + } + } + else { + return false; + } + } + + public int hashCode() + { + long temp = this.count != 0.0D ? Double.doubleToLongBits(this.count) : 0L; + int result = (int) (temp ^ temp >>> 32); + temp = this.mean != 0.0D ? Double.doubleToLongBits(this.mean) : 0L; + result = 31 * result + (int) (temp ^ temp >>> 32); + return result; + } + + public String toString() + { + return String.format("[count: %f, mean: %f]", this.count, this.mean); + } + } + + private static final class HistogramBuilderStateHolder + { + double sum; + double lastSum; + double bucketWeightedSum; + + private HistogramBuilderStateHolder() + { + } + } + + private static enum TraversalOrder + { + FORWARD, + REVERSE; + + private TraversalOrder() + { + } + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/operator/scalar/TestTDigest.java b/presto-main/src/test/java/com/facebook/presto/operator/scalar/TestTDigest.java new file mode 100644 index 0000000000000..c0753b697af92 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/operator/scalar/TestTDigest.java @@ -0,0 +1,411 @@ +package com.facebook.presto.operator.scalar; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.math3.distribution.BinomialDistribution; +import org.apache.commons.math3.distribution.GeometricDistribution; +import org.apache.commons.math3.distribution.NormalDistribution; +import org.apache.commons.math3.distribution.PoissonDistribution; +import org.apache.commons.math3.distribution.UniformIntegerDistribution; +import org.apache.commons.math3.distribution.UniformRealDistribution; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class TestTDigest +{ + private static final int NUMBER_OF_ENTRIES = 5_000_000; + private static final int STANDARD_COMPRESSION_FACTOR = 100; + private static final int MINIMUM_DISCRETE_ERROR = 1; + private static final double STANDARD_ERROR = 0.01; + + @Test + public void testAddElementsInOrder() + { + TDigest tDigest = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + tDigest.add(i); + } + + for (int i = 0; i < quantile.length; i++) { + assertEquals(tDigest.quantile(quantile[i]), NUMBER_OF_ENTRIES * quantile[i], NUMBER_OF_ENTRIES * STANDARD_ERROR); + } + } + + @Test + public void testMergeTwoDistributionsWithoutOverlap() + { + TDigest tDigest1 = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + TDigest tDigest2 = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES / 2; i++) { + tDigest1.add(i); + tDigest2.add(i + NUMBER_OF_ENTRIES / 2); + } + + tDigest1.add(tDigest2); + + for (int i = 0; i < quantile.length; i++) { + assertEquals(tDigest1.quantile(quantile[i]), NUMBER_OF_ENTRIES * quantile[i], NUMBER_OF_ENTRIES * STANDARD_ERROR); + } + } + + @DataProvider(name = "compressionFactors") + public Object[][] dataProvider() + { + return new Object[][] {{100}, {500}, {1000}}; + } + + @Test(dataProvider = "compressionFactors") + public void testMergeTwoDistributionsWithOverlap(int compressionFactor) + { + TDigest tDigest1 = TDigest.createMergingDigest(compressionFactor); + TDigest tDigest2 = TDigest.createMergingDigest(compressionFactor); + List list = new ArrayList(); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + double[] quantileExtremes = {0.0001, 0.02000, 0.0300, 0.0400, 0.0500, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES / 2; i++) { + tDigest1.add(i); + tDigest2.add(i); + list.add((double) i); + list.add((double) i); + } + + tDigest2.add(ImmutableList.of(tDigest1)); + Collections.sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertEquals(tDigest2.quantile(quantile[i]), list.get((int) (NUMBER_OF_ENTRIES * quantile[i])), NUMBER_OF_ENTRIES * STANDARD_ERROR); + } + } + + @Test (enabled = false) + public void testAddElementsRandomized() + { + TDigest tDigest = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList(); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + double value = Math.random() * NUMBER_OF_ENTRIES; + tDigest.add(value); + list.add(value); + } + + Collections.sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertEquals(tDigest.quantile(quantile[i]), list.get((int) (NUMBER_OF_ENTRIES * quantile[i])), NUMBER_OF_ENTRIES * STANDARD_ERROR); + } + } + + @Test + public void testNormalDistributionLowVariance() + { + TDigest tDigest = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + List list = new ArrayList(); + NormalDistribution normal = new NormalDistribution(1000, 1); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + double value = normal.sample(); + tDigest.add(value); + list.add(value); + } + Collections.sort(list); + for (int i = 0; i < quantile.length; i++) { + assertEquals(tDigest.quantile(quantile[i]), list.get((int) (NUMBER_OF_ENTRIES * quantile[i])), list.get((int) (NUMBER_OF_ENTRIES * quantile[i])) * STANDARD_ERROR); + } + } + + @Test(invocationCount = 10) + public void testNormalDistributionHighVariance() + { + TDigest tDigest = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + QuantileDigest qDigest = new QuantileDigest(STANDARD_ERROR); + List list = new ArrayList(); + NormalDistribution normal = new NormalDistribution(0, 1); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + double value = normal.sample(); + tDigest.add(value); + qDigest.add((long) value); + list.add(value); + } + Collections.sort(list); + for (int i = 0; i < quantile.length; i++) { + System.out.println("quantile:" + quantile[i] + " value:" + list.get((int) (NUMBER_OF_ENTRIES * quantile[i])) + " t-digest:" + tDigest.quantile(quantile[i])); + assertContinuousWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + + @Test(dataProvider = "compressionFactors") + public void testMergeTwoNormalDistributions(int compressionFactor) + { + TDigest tDigest1 = TDigest.createMergingDigest(compressionFactor); + TDigest tDigest2 = TDigest.createMergingDigest(compressionFactor); + List list = new ArrayList<>(); + NormalDistribution normal = new NormalDistribution(500, 20); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + double[] quantileExtremes = {0.0001, 0.02000, 0.0300, 0.0400, 0.0500, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES / 2; i++) { + double value1 = normal.sample(); + double value2 = normal.sample(); + tDigest1.add(value1); + tDigest2.add(value2); + list.add(value1); + list.add(value2); + } + + tDigest1.add(tDigest2); + Collections.sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertContinuousWithinBound(quantile[i], STANDARD_ERROR, list, tDigest1); + } + } + @Test(dataProvider = "compressionFactors") + public void testMergeManySmallNormalDistributions(int compressionFactor) + { + TDigest tDigest = TDigest.createMergingDigest(compressionFactor); + List list = new ArrayList<>(); + NormalDistribution normal = new NormalDistribution(500, 20); + int digests = 100_000; + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + double[] quantileExtremes = {0.0001, 0.02000, 0.0300, 0.0400, 0.0500, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int k = 0; k < digests; k++) { + TDigest current = TDigest.createMergingDigest(compressionFactor); + for (int i = 0; i < 10; i++) { + double value = normal.sample(); + current.add(value); + list.add(value); + } + tDigest.add(current); + } + Collections.sort(list); + for (int i = 0; i < quantile.length; i++) { + assertContinuousWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + + @Test(dataProvider = "compressionFactors") + public void testMergeManyLargeNormalDistributions(int compressionFactor) + { + TDigest tDigest = TDigest.createMergingDigest(compressionFactor); + List list = new ArrayList<>(); + NormalDistribution normal = new NormalDistribution(500, 20); + int digests = 1000; + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + double[] quantileExtremes = {0.0001, 0.02000, 0.0300, 0.0400, 0.0500, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int k = 0; k < digests; k++) { + TDigest current = TDigest.createMergingDigest(compressionFactor); + for (int i = 0; i < NUMBER_OF_ENTRIES / digests; i++) { + double value = normal.sample(); + current.add(value); + list.add(value); + } + tDigest.add(current); + } + Collections.sort(list); + for (int i = 0; i < quantile.length; i++) { + assertContinuousWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + + @Test(dataProvider = "compressionFactors") + public void testCompareAccuracy(int compressionFactor) + { + TDigest tDigest = TDigest.createMergingDigest(compressionFactor); + QuantileDigest qDigest = new QuantileDigest(STANDARD_ERROR); + List values = new ArrayList<>(); + //NormalDistribution normal = new NormalDistribution(500, 20); + UniformRealDistribution normal = new UniformRealDistribution(0, 1_000_000_000); + UniformIntegerDistribution uniformReal = new UniformIntegerDistribution(0, 1_000_000_000); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + double[] quantileExtremes = {0.0001, 0.0005, 0.0010, 0.0050, 0.0100, 0.9900, 0.9950, 0.9990, 0.9995, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + double value = normal.sample(); + qDigest.add((long) value); + tDigest.add(value); + values.add(value); + } + int success = 0; + int total = 0; + Collections.sort(values); + for (int i = 0; i < quantile.length; i++) { + double errorQuantileT = normal.cumulativeProbability(tDigest.quantile(quantile[i])); + errorQuantileT = (double) Math.round(errorQuantileT * 10000d) / 10000d; + + double errorQuantileQ = normal.cumulativeProbability(qDigest.getQuantile(quantile[i])); + errorQuantileQ = (double) Math.round(errorQuantileQ * 10000d) / 10000d; + + // System.out.println(quantile[i] + " " + errorQuantile); + + System.out.println("T" + " size: " + tDigest.byteSize() + " quantile: " + String.format("%.4f", quantile[i]) + " error: " + String.format("%.4f", 100 * Math.abs(errorQuantileT - quantile[i]))); + System.out.println("Q" + " size: " + qDigest.estimatedSerializedSizeInBytes() + " quantile: " + String.format("%.4f", quantile[i]) + " %error: " + + String.format("%.4f", 100 * Math.abs(errorQuantileQ - quantile[i]))); + + /*double tError = Math.abs(values.get((int) (quantile[i] * NUMBER_OF_ENTRIES)) - tDigest.quantile(quantile[i])) / values.get((int) (quantile[i] * NUMBER_OF_ENTRIES)) * 100; + double qError = Math.abs(values.get((int) (quantile[i] * NUMBER_OF_ENTRIES)) - qDigest.getQuantile(quantile[i])) / values.get((int) (quantile[i] * NUMBER_OF_ENTRIES)) * 100; + double error = Math.abs(values.get((int) (quantile[i] * NUMBER_OF_ENTRIES)) - normal.inverseCumulativeProbability(quantile[i])) / values.get((int) (quantile[i] * NUMBER_OF_ENTRIES)) * 100; + //System.out.println(error); + System.out.println("T" + " size: " + tDigest.byteSize() + " quantile: " + String.format("%.4f", quantile[i]) + " %error: " + String.format("%.15f", tError)); + System.out.println("Q" + " size: " + qDigest.estimatedSerializedSizeInBytes() + " quantile: " + String.format("%.4f", quantile[i]) + " %error: " + + String.format("%.15f", qError)); + */ + + if (Math.abs(errorQuantileT - quantile[i]) < Math.abs(errorQuantileQ - quantile[i]) || Math.abs(errorQuantileT - quantile[i]) < STANDARD_ERROR) { + success++; + } + total++; + } + System.out.println("correct:" + success + " total:" + total + " %:" + (double) success / (double) total * 100); + } + + @Test(enabled = false) + public void testBinomialDistribution() + { + int trials = 10; + for (int k = 1; k < trials; k++) { + TDigest tDigest = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + BinomialDistribution binomial = new BinomialDistribution(trials, k * 0.1); + List list = new ArrayList<>(); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + int sample = binomial.sample(); + tDigest.add(sample); + list.add(sample); + } + + Collections.sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertDiscreteWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + } + } + } + + @Test(enabled = false) + public void testGeometricDistribution() + { + int trials = 10; + for (int k = 1; k < trials; k++) { + TDigest tDigest = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + GeometricDistribution geometric = new GeometricDistribution(k * 0.1); + List list = new ArrayList(); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + int sample = geometric.sample(); + tDigest.add(sample); + list.add(sample); + } + + Collections.sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertDiscreteWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + //assertEquals(Math.rint(tDigest.quantile(quantile[i])), geometric.inverseCumulativeProbability(quantile[i]), MINIMUM_DISCRETE_ERROR); + } + } + } + + @Test(enabled = true) + public void testPoissonDistribution() + { + int trials = 10; + for (int k = 1; k < trials; k++) { + TDigest tDigest = TDigest.createMergingDigest(STANDARD_COMPRESSION_FACTOR); + PoissonDistribution poisson = new PoissonDistribution(k * 0.1); + List list = new ArrayList(); + double[] quantile = {0.0001, 0.0200, 0.0300, 0.04000, 0.0500, 0.1000, 0.2000, 0.3000, 0.4000, 0.5000, 0.6000, 0.7000, 0.8000, + 0.9000, 0.9500, 0.9600, 0.9700, 0.9800, 0.9999}; + + for (int i = 0; i < NUMBER_OF_ENTRIES; i++) { + int sample = poisson.sample(); + tDigest.add(sample); + list.add(sample); + } + + Collections.sort(list); + + for (int i = 0; i < quantile.length; i++) { + assertDiscreteWithinBound(quantile[i], STANDARD_ERROR, list, tDigest); + //assertEquals(Math.rint(tDigest.quantile(quantile[i])), poisson.inverseCumulativeProbability(quantile[i]), MINIMUM_DISCRETE_ERROR); + } + } + } + + private void assertContinuousWithinBound(double quantile, double bound, List values, TDigest tDigest) + { + double lowerBound = quantile - bound; + double upperBound = quantile + bound; + + if (lowerBound < 0) { + lowerBound = tDigest.getMin(); + } + else { + lowerBound = values.get((int) (NUMBER_OF_ENTRIES * lowerBound)); + } + + if (upperBound >= 1) { + upperBound = tDigest.getMax(); + } + else { + upperBound = values.get((int) (NUMBER_OF_ENTRIES * upperBound)); + } + + assertTrue(tDigest.quantile(quantile) >= lowerBound, String.format("Value %s is outside than lower bound %s", tDigest.quantile(quantile), lowerBound)); + assertTrue(tDigest.quantile(quantile) <= upperBound, String.format("Value %s is outside than upper bound %s", tDigest.quantile(quantile), upperBound)); + } + + private void assertDiscreteWithinBound(double quantile, double bound, List values, TDigest tDigest) + { + double lowerBound = quantile - bound; + double upperBound = quantile + bound; + + if (lowerBound < 0) { + lowerBound = tDigest.getMin(); + } + else { + lowerBound = values.get((int) (NUMBER_OF_ENTRIES * lowerBound)); + } + + if (upperBound >= 1) { + upperBound = tDigest.getMax(); + } + else { + upperBound = values.get((int) (NUMBER_OF_ENTRIES * upperBound)); + } + + assertTrue(Math.rint(tDigest.quantile(quantile)) >= lowerBound, String.format("Value %s is outside than lower bound %s", tDigest.quantile(quantile), lowerBound)); + assertTrue(Math.rint(tDigest.quantile(quantile)) <= upperBound, String.format("Value %s is outside than upper bound %s", tDigest.quantile(quantile), upperBound)); + } +}