Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch}
@Experimental
final class DataFrameStatFunctions private[sql](df: DataFrame) {

/**
* Calculate the approximate quantile of numerical column of a DataFrame.
* @param col the name of the column
* @param quantile the quantile number
* @return the approximate quantile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

epsilon is not documented. It might be better to call it relerr or relativeError because epsilon doesn't carry any information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
def approxQuantile(col: String, quantile: Double, epsilon: Double): Double = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to think of a case when a user just needs a single quantile (maybe just median?). If we want to expose public APIs, I would suggest def approxQuantiles(col: String, quantiles: Array[Double], epsilon: Double): Array[Double]. I use Array[Double] here for Java compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, changing this to an Array return

StatFunctions.approxQuantile(df, col, quantile, epsilon)
}

/**
* Calculate the sample covariance of two numerical columns of a DataFrame.
* @param col1 the name of the first column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.execution.stat

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.Logging
import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow}
Expand All @@ -27,6 +30,312 @@ import org.apache.spark.unsafe.types.UTF8String

private[sql] object StatFunctions extends Logging {

import QuantileSummaries.Stats

/**
* Calculates the approximate quantile for the given column.
*
* If you need to compute multiple quantiles at once, you should use [[multipleApproxQuantiles]]
*
* Note on the target error.
*
* The result of this algorithm has the following deterministic bound:
* if the DataFrame has N elements and if we request the quantile `phi` up to error `epsi`,
* then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
* of `x` close to (phi * N). More precisely:
*
* floor((phi - epsi) * N) <= rank(x) <= ceil((phi + epsi) * N)
*
* Note on the algorithm used.
*
* This method implements a variation of the Greenwald-Khanna algorithm
* (with some speed optimizations). The algorithm was first present in the following article:
* "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
* and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in both places

*
* The performance optimizations are detailed in the comments of the implementation.
*
* @param df the dataframe to estimate quantiles on
* @param col the name of the column
* @param quantile the target quantile of interest
* @param epsilon the target error. Should be >= 0.
* */
def approxQuantile(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary. If we only have approxQuantiles in the public API, we can change quantile: Double to quantiles: Seq[Double] here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, removed

df: DataFrame,
col: String,
quantile: Double,
epsilon: Double = QuantileSummaries.defaultEpsilon): Double = {
require(quantile >= 0.0 && quantile <= 1.0, "Quantile must be in the range of (0.0, 1.0).")
val Seq(Seq(res)) = multipleApproxQuantiles(df, Seq(col), Seq(quantile), epsilon)
res
}

/**
* Runs multiple quantile computations in a single pass, with the same target error.
*
* See [[approxQuantile)]] for more details on the approximation guarantees.
*
* @param df the dataframe
* @param cols columns of the dataframe
* @param quantiles target quantiles to compute
* @param epsilon the precision to achieve
* @return for each column, returns the requested approximations
*/
def multipleApproxQuantiles(
df: DataFrame,
cols: Seq[String],
quantiles: Seq[Double],
epsilon: Double): Seq[Seq[Double]] = {
val columns: Seq[Column] = cols.map { colName =>
val field = df.schema(colName)
require(field.dataType.isInstanceOf[NumericType],
s"Quantile calculation for column $colName with data type ${field.dataType}" +
" is not supported.")
Column(Cast(Column(colName).expr, DoubleType))
}
val emptySummaries = Array.fill(cols.size)(
new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon))

// Note that it works more or less by accident as `rdd.aggregate` is not a pure function:
// this function returns the same array as given in the input (because `aggregate` reuses
// the same argument).
def apply(summaries: Array[QuantileSummaries], row: Row): Array[QuantileSummaries] = {
var i = 0
while (i < summaries.length) {
summaries(i) = summaries(i).insert(row.getDouble(i))
i += 1
}
summaries
}

def merge(
sum1: Array[QuantileSummaries],
sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = {
sum1.zip(sum2).map { case (s1, s2) => s1.compress().merge(s2.compress()) }
}
val summaries = df.select(columns: _*).rdd.aggregate(emptySummaries)(apply, merge)

summaries.map { summary => quantiles.map(summary.query) }
}

/**
* Helper class to compute approximate quantile summary.
* This implementation is based on the algorithm proposed in the paper:
* "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
* and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
*
* In order to optimize for speed, it maintains an internal buffer of the last seen samples,
* and only inserts them after crossing a certain size threshold. This guarantees a near-constant
* runtime complexity compared to the original algorithm.
*
* @param compressThreshold the compression threshold: after the internal buffer of statistics
* crosses this size, it attempts to compress the statistics together
* @param epsilon the target precision
* @param sampled a buffer of quantile statistics. See the G-K article for more details
* @param count the count of all the elements *inserted in the sampled buffer*
* (excluding the head buffer)
* @param headSampled a buffer of latest samples seen so far
*/
class QuantileSummaries(
val compressThreshold: Int,
val epsilon: Double,
val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty,
private[stat] var count: Long = 0L,
val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends Serializable {

import QuantileSummaries._

def insert(x: Double): QuantileSummaries = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Miss API doc, especially on the object returned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes, thanks. This is an important one. I also added documentation to the other methods.

headSampled.append(x)
if (headSampled.size >= defaultHeadSize) {
this.withHeadInserted
} else {
this
}
}

/**
* Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse
* the summary statistics in a single batch.
*
* This method does not modify the current object and returns if necessary a new copy.
*
* @return a new quantile summary object.
*/
private def withHeadInserted: QuantileSummaries = {
if (headSampled.isEmpty) {
return this
}
var currentCount = count
val sorted = headSampled.toArray.sorted
val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]()
// The index of the next element to insert
var sampleIdx = 0
// The index of the sample currently being inserted.
var opsIdx: Int = 0
while(opsIdx < sorted.length) {
val currentSample = sorted(opsIdx)
// Add all the samples before the next observation.
while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) {
newSamples.append(sampled(sampleIdx))
sampleIdx += 1
}

// If it is the first one to insert, of if it is the last one
currentCount += 1
val delta =
if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) {
0
} else {
math.floor(2 * epsilon * currentCount).toInt
}

val tuple = Stats(currentSample, 1, delta)
newSamples.append(tuple)
opsIdx += 1
}

// Add all the remaining existing samples
while(sampleIdx < sampled.size) {
newSamples.append(sampled(sampleIdx))
sampleIdx += 1
}
new QuantileSummaries(compressThreshold, epsilon, newSamples, currentCount)
}

def compress(): QuantileSummaries = {
// Inserts all the elements first
val inserted = this.withHeadInserted
assert(inserted.headSampled.isEmpty)
assert(inserted.count == count + headSampled.size)
val compressed =
compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * inserted.count)
new QuantileSummaries(compressThreshold, epsilon, compressed, inserted.count)
}

def merge(other: QuantileSummaries): QuantileSummaries = {
if (other.count == 0) {
this
} else if (count == 0) {
other
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: In the contract of RDD.aggregate, merge should either modify the first value or create a new object for return.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks for the clarification. When merging, empty summaries would only indicate that a partition was empty, which should not happen often... I will return a shallow copy to make sure the contract is respected.

} else {
// We rely on the fact that they are ordered to efficiently interleave them.
val thisSampled = sampled.toList
val otherSampled = other.sampled.toList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how much speedup we can get by merging the two lists manually compared to (thisSampled ++ otherSampled).sorted. Did you run some tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the current implementation is too complicated, and that probably just merging/sorting the two arrays directly is more efficient for the size considered.

When running some performance testing, the cost of the algorithm was dominated by the cost of accessing the content of Rows. Only 4% of the running time was spent on insertion+merging, so this cost was negligible at this point.

I am going to do as you suggest. If it happens to be a bottleneck when we use UDAFs later, directly manipulating ArrayBuffers would be more efficient than pattern-matching on lists anyway. Rerunning the synthetic benchmark with the suggested changes did not yield runtime changes.

val res: ArrayBuffer[Stats] = ArrayBuffer.empty

@tailrec
def mergeCurrent(
thisList: List[Stats],
otherList: List[Stats]): Unit = (thisList, otherList) match {
case (Nil, l) =>
res.appendAll(l)
case (l, Nil) =>
res.appendAll(l)
case (h1 :: t1, h2 :: t2) if h1.value > h2.value =>
mergeCurrent(otherList, thisList)
case (h1 :: t1, l) =>
// We know that h1.value <= all values in l
// TODO(thunterdb) do we need to adjust g and delta?
res.append(h1)
mergeCurrent(t1, l)
}

mergeCurrent(thisSampled, otherSampled)
val comp = compressImmut(res, mergeThreshold = 2 * epsilon * count)
new QuantileSummaries(other.compressThreshold, other.epsilon, comp, other.count + count)
}
}

def query(quantile: Double): Double = {
require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]")

if (quantile <= epsilon) {
return sampled.head.value
}

if (quantile >= 1 - epsilon) {
return sampled.last.value
}

// Target rank
val rank = math.ceil(quantile * count).toInt
val targetError = math.ceil(epsilon * count)
// Minimum rank at current sample
var minRank = 0
var i = 1
while (i < sampled.size - 1) {
val curSample = sampled(i)
minRank += curSample.g
val maxRank = minRank + curSample.delta
if (maxRank - targetError <= rank && rank <= minRank + targetError) {
return curSample.value
}
i += 1
}
sampled.last.value
}
}

object QuantileSummaries {
// TODO(tjhunter) more tuning could be done one the constants here, but for now
// the main cost of the algorithm is accessing the data in SQL.
/**
* The default value for the compression threshold.
*/
val defaultCompressThreshold: Int = 10000

/**
* The size of the head buffer.
*/
val defaultHeadSize: Int = 50000

/**
* The default value for epsilon.
*/
val defaultEpsilon: Double = 0.01

/**
* Statisttics from the Greenwald-Khanna paper.
* @param value the sampled value
* @param g the minimum rank jump from the previous value's minimum rank
* @param delta the maximum span of the rank.
*/
case class Stats(value: Double, g: Int, delta: Int)

private def compressImmut(
currentSamples: IndexedSeq[Stats],
mergeThreshold: Double): ArrayBuffer[Stats] = {
val res: ArrayBuffer[Stats] = ArrayBuffer.empty
if (currentSamples.isEmpty) {
return res
}
// Start for the last element, which is always part of the set.
// The head contains the current new head, that may be merged with the current element.
var head = currentSamples.last
var i = currentSamples.size - 2
// Do not compress the last element
while (i >= 1) {
// The current sample:
val sample1 = currentSamples(i)
// Do we need to compress?
if (sample1.g + head.g + head.delta < mergeThreshold) {
// Do not insert yet, just merge the current element into the head.
head = head.copy(g = head.g + sample1.g)
} else {
// Prepend the current head, and keep the current sample as target for merging.
res.prepend(head)
head = sample1
}
i -= 1
}
res.prepend(head)
// If necessary, add the minimum element:
res.prepend(currentSamples.head)
res
}
}

/** Calculate the Pearson Correlation Coefficient for the given columns */
private[sql] def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = {
val counts = collectStatisticalData(df, cols, "correlation")
Expand Down
Loading