Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,37 @@ import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch}
final class DataFrameStatFunctions private[sql](df: DataFrame) {

/**
* Calculate the approximate quantile of numerical column of a DataFrame.
* @param col the name of the column
* @param quantile the quantile number
* @return the approximate quantile
* Calculates the approximate quantiles of a numerical column of a DataFrame.
*
* The result of this algorithm has the following deterministic bound:
* If the DataFrame has N elements and if we request the quantile at probability `p` up to error
* `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
* of `x` is close to (p * N).
* More precisely,
*
* floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
*
* This method implements a variation of the Greenwald-Khanna algorithm (with some speed
* optimizations).
* The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient
* Online Computation of Quantile Summaries]] by Greenwald and Khanna.
*
* @param col the name of the numerical column
* @param probabilities a list of quantile probabilities
* Each number must belong to [0, 1].
* For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
* @param relativeError The relative target precision to achieve (>= 0).
* If set to zero, the exact quantiles are computed, which could be very expensive.
* Note that values greater than 1 are accepted but give the same result as 1.
* @return the approximate quantiles at the given probabilities
*
* @since 2.0.0
*/
def approxQuantile(col: String, quantile: Double, epsilon: Double): Double = {
StatFunctions.approxQuantile(df, col, quantile, epsilon)
def approxQuantile(
col: String,
probabilities: Array[Double],
relativeError: Double): Array[Double] = {
StatFunctions.multipleApproxQuantiles(df, Seq(col), probabilities, relativeError).head.toArray
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.stat

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging
Expand All @@ -33,59 +32,37 @@ private[sql] object StatFunctions extends Logging {
import QuantileSummaries.Stats

/**
* Calculates the approximate quantile for the given column.
*
* If you need to compute multiple quantiles at once, you should use [[multipleApproxQuantiles]]
*
* Note on the target error.
* Calculates the approximate quantiles of multiple numerical columns of a DataFrame in one pass.
*
* The result of this algorithm has the following deterministic bound:
* if the DataFrame has N elements and if we request the quantile `phi` up to error `epsi`,
* then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
* of `x` close to (phi * N). More precisely:
*
* floor((phi - epsi) * N) <= rank(x) <= ceil((phi + epsi) * N)
*
* Note on the algorithm used.
* If the DataFrame has N elements and if we request the quantile at probability `p` up to error
* `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
* of `x` is close to (p * N).
* More precisely,
*
* This method implements a variation of the Greenwald-Khanna algorithm
* (with some speed optimizations). The algorithm was first present in the following article:
* "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
* and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
* floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
*
* The performance optimizations are detailed in the comments of the implementation.
*
* @param df the dataframe to estimate quantiles on
* @param col the name of the column
* @param quantile the target quantile of interest
* @param epsilon the target error. Should be >= 0.
* */
def approxQuantile(
df: DataFrame,
col: String,
quantile: Double,
epsilon: Double = QuantileSummaries.defaultEpsilon): Double = {
require(quantile >= 0.0 && quantile <= 1.0, "Quantile must be in the range of (0.0, 1.0).")
val Seq(Seq(res)) = multipleApproxQuantiles(df, Seq(col), Seq(quantile), epsilon)
res
}

/**
* Runs multiple quantile computations in a single pass, with the same target error.
*
* See [[approxQuantile)]] for more details on the approximation guarantees.
* This method implements a variation of the Greenwald-Khanna algorithm (with some speed
* optimizations).
* The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient
* Online Computation of Quantile Summaries]] by Greenwald and Khanna.
*
* @param df the dataframe
* @param cols columns of the dataframe
* @param quantiles target quantiles to compute
* @param epsilon the precision to achieve
* @param cols numerical columns of the dataframe
* @param probabilities a list of quantile probabilities
* Each number must belong to [0, 1].
* For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
* @param relativeError The relative target precision to achieve (>= 0).
* If set to zero, the exact quantiles are computed, which could be very expensive.
* Note that values greater than 1 are accepted but give the same result as 1.
*
* @return for each column, returns the requested approximations
*/
def multipleApproxQuantiles(
df: DataFrame,
cols: Seq[String],
quantiles: Seq[Double],
epsilon: Double): Seq[Seq[Double]] = {
probabilities: Seq[Double],
relativeError: Double): Seq[Seq[Double]] = {
val columns: Seq[Column] = cols.map { colName =>
val field = df.schema(colName)
require(field.dataType.isInstanceOf[NumericType],
Expand All @@ -94,7 +71,7 @@ private[sql] object StatFunctions extends Logging {
Column(Cast(Column(colName).expr, DoubleType))
}
val emptySummaries = Array.fill(cols.size)(
new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon))
new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, relativeError))

// Note that it works more or less by accident as `rdd.aggregate` is not a pure function:
// this function returns the same array as given in the input (because `aggregate` reuses
Expand All @@ -115,40 +92,49 @@ private[sql] object StatFunctions extends Logging {
}
val summaries = df.select(columns: _*).rdd.aggregate(emptySummaries)(apply, merge)

summaries.map { summary => quantiles.map(summary.query) }
summaries.map { summary => probabilities.map(summary.query) }
}

/**
* Helper class to compute approximate quantile summary.
* This implementation is based on the algorithm proposed in the paper:
* "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
* and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
* and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)
*
* In order to optimize for speed, it maintains an internal buffer of the last seen samples,
* and only inserts them after crossing a certain size threshold. This guarantees a near-constant
* runtime complexity compared to the original algorithm.
*
* @param compressThreshold the compression threshold: after the internal buffer of statistics
* crosses this size, it attempts to compress the statistics together
* @param epsilon the target precision
* @param sampled a buffer of quantile statistics. See the G-K article for more details
* @param compressThreshold the compression threshold.
* After the internal buffer of statistics crosses this size, it attempts to compress the
* statistics together.
* @param relativeError the target relative error.
* It is uniform across the complete range of values.
* @param sampled a buffer of quantile statistics.
* See the G-K article for more details.
* @param count the count of all the elements *inserted in the sampled buffer*
* (excluding the head buffer)
* @param headSampled a buffer of latest samples seen so far
*/
class QuantileSummaries(
val compressThreshold: Int,
val epsilon: Double,
val relativeError: Double,
val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty,
private[stat] var count: Long = 0L,
val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends Serializable {

import QuantileSummaries._

/**
* Returns a summary with the given observation inserted into the summary.
* This method may either modify in place the current summary (and return the same summary,
* modified in place), or it may create a new summary from scratch it necessary.
* @param x the new observation to insert into the summary
*/
def insert(x: Double): QuantileSummaries = {
headSampled.append(x)
if (headSampled.size >= defaultHeadSize) {
this.withHeadInserted
this.withHeadBufferInserted
} else {
this
}
Expand All @@ -162,7 +148,7 @@ private[sql] object StatFunctions extends Logging {
*
* @return a new quantile summary object.
*/
private def withHeadInserted: QuantileSummaries = {
private def withHeadBufferInserted: QuantileSummaries = {
if (headSampled.isEmpty) {
return this
}
Expand All @@ -187,7 +173,7 @@ private[sql] object StatFunctions extends Logging {
if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) {
0
} else {
math.floor(2 * epsilon * currentCount).toInt
math.floor(2 * relativeError * currentCount).toInt
}

val tuple = Stats(currentSample, 1, delta)
Expand All @@ -200,67 +186,80 @@ private[sql] object StatFunctions extends Logging {
newSamples.append(sampled(sampleIdx))
sampleIdx += 1
}
new QuantileSummaries(compressThreshold, epsilon, newSamples, currentCount)
new QuantileSummaries(compressThreshold, relativeError, newSamples, currentCount)
}

/**
* Returns a new summary that compresses the summary statistics and the head buffer.
*
* This implements the COMPRESS function of the GK algorithm. It does not modify the object.
*
* @return a new summary object with compressed statistics
*/
def compress(): QuantileSummaries = {
// Inserts all the elements first
val inserted = this.withHeadInserted
val inserted = this.withHeadBufferInserted
assert(inserted.headSampled.isEmpty)
assert(inserted.count == count + headSampled.size)
val compressed =
compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * inserted.count)
new QuantileSummaries(compressThreshold, epsilon, compressed, inserted.count)
compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
}

private def shallowCopy: QuantileSummaries = {
new QuantileSummaries(compressThreshold, relativeError, sampled, count, headSampled)
}

/**
* Merges two (compressed) summaries together.
*
* Returns a new summary.
*/
def merge(other: QuantileSummaries): QuantileSummaries = {
require(headSampled.isEmpty, "Current buffer needs to be compressed before merge")
require(other.headSampled.isEmpty, "Other buffer needs to be compressed before merge")
if (other.count == 0) {
this
this.shallowCopy
} else if (count == 0) {
other
other.shallowCopy
} else {
// We rely on the fact that they are ordered to efficiently interleave them.
val thisSampled = sampled.toList
val otherSampled = other.sampled.toList
val res: ArrayBuffer[Stats] = ArrayBuffer.empty

@tailrec
def mergeCurrent(
thisList: List[Stats],
otherList: List[Stats]): Unit = (thisList, otherList) match {
case (Nil, l) =>
res.appendAll(l)
case (l, Nil) =>
res.appendAll(l)
case (h1 :: t1, h2 :: t2) if h1.value > h2.value =>
mergeCurrent(otherList, thisList)
case (h1 :: t1, l) =>
// We know that h1.value <= all values in l
// TODO(thunterdb) do we need to adjust g and delta?
res.append(h1)
mergeCurrent(t1, l)
}

mergeCurrent(thisSampled, otherSampled)
val comp = compressImmut(res, mergeThreshold = 2 * epsilon * count)
new QuantileSummaries(other.compressThreshold, other.epsilon, comp, other.count + count)
// Merge the two buffers.
// The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the
// statistics during the merging: the invariants are still respected after the merge.
// TODO: could replace full sort by ordered merge, the two lists are known to be sorted
// already.
val res = (sampled ++ other.sampled).sortBy(_.value)
val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
new QuantileSummaries(
other.compressThreshold, other.relativeError, comp, other.count + count)
}
}

/**
* Runs a query for a given quantile.
* The result follows the approximation guarantees detailed above.
* The query can only be run on a compressed summary: you need to call compress() before using
* it.
*
* @param quantile the target quantile
* @return
*/
def query(quantile: Double): Double = {
require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]")
require(headSampled.isEmpty,
"Cannot operate on an uncompressed summary, call compress() first")

if (quantile <= epsilon) {
if (quantile <= relativeError) {
return sampled.head.value
}

if (quantile >= 1 - epsilon) {
if (quantile >= 1 - relativeError) {
return sampled.last.value
}

// Target rank
val rank = math.ceil(quantile * count).toInt
val targetError = math.ceil(epsilon * count)
val targetError = math.ceil(relativeError * count)
// Minimum rank at current sample
var minRank = 0
var i = 1
Expand Down Expand Up @@ -291,9 +290,10 @@ private[sql] object StatFunctions extends Logging {
val defaultHeadSize: Int = 50000

/**
* The default value for epsilon.
* The default value for the relative error (1%).
* With this value, the best extreme percentiles that can be approximated are 1% and 99%.
*/
val defaultEpsilon: Double = 0.01
val defaultRelativeError: Double = 0.01

/**
* Statisttics from the Greenwald-Khanna paper.
Expand Down
Loading