Skip to content
16 changes: 14 additions & 2 deletions docs/mllib-statistics.md
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,16 @@ user tests against the normal distribution (`distName="norm"`), but does not pro
parameters, the test initializes to the standard normal distribution and logs an appropriate
message.

There is also a 2-sample, 2-sided implementation available, which tests the null hypothesis that the
2 samples are drawn from the same distribution. It is worth noting that the test assumes that all
elements are unique, both within and across the 2 samples, and thus no ranking ties should occur.
Given that the test is for continuous distributions this should not be an onerous requirement.

<div class="codetabs">
<div data-lang="scala" markdown="1">
[`Statistics`](api/scala/index.html#org.apache.spark.mllib.stat.Statistics$) provides methods to
run a 1-sample, 2-sided Kolmogorov-Smirnov test. The following example demonstrates how to run
and interpret the hypothesis tests.
run a 1-sample, 2-sided Kolmogorov-Smirnov test and a 2-sample, 2-sided Kolmogorv-Smirnov test.
The following example demonstrates how to run and interpret the hypothesis tests.

{% highlight scala %}
import org.apache.spark.SparkContext
Expand All @@ -452,6 +457,13 @@ println(testResult) // summary of the test including the p-value, test statistic
// perform a KS test using a cumulative distribution function of our making
val myCDF: Double => Double = ...
val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)

val data2: RDD[Double] = ... // another RDD of sample data
// run a KS test for data vs data 2
// this corresponds to a 2-sample test
// the statistic provides a test for the null hypothesis that both samples are drawn from the
// same distribution
val ksTestResult2 = Statistics.kolmogorovSmirnovTest2Sample(data, data2)
{% endhighlight %}
</div>
</div>
Expand Down
14 changes: 14 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,18 @@ object Statistics {
: KolmogorovSmirnovTestResult = {
KolmogorovSmirnovTest.testOneSample(data, distName, params: _*)
}

/**
* Perform a two-sample, two-sided Kolmogorov-Smirnov test for probability distribution equality
* The null hypothesis is that both samples come from the same distribution
* @param data1 `RDD[Double]` first sample of data
* @param data2 `RDD[Double]` second sample of data
* @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] object containing test
* statistic, p-value, and null hypothesis
*/
def kolmogorovSmirnovTest2Sample(data1: RDD[Double], data2: RDD[Double])
: KolmogorovSmirnovTestResult = {
KolmogorovSmirnovTest.testTwoSamples(data1, data2)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
object NullHypothesis extends Enumeration {
type NullHypothesis = Value
val OneSampleTwoSided = Value("Sample follows theoretical distribution")
val TwoSampleTwoSided = Value("Both samples follow the same distribution")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd make the above one "Sample follows the theoretical distribution" or make the bottom one "Both samples follow same distribution".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. Modified the first. Thanks

}

/**
Expand Down Expand Up @@ -190,5 +191,104 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
val pval = 1 - new CommonMathKolmogorovSmirnovTest().cdf(ksStat, n.toInt)
new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString)
}

/**
* Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples
* come from the same distribution
* @param data1 `RDD[Double]` first sample of data
* @param data2 `RDD[Double]` second sample of data
* @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
* statistic, p-value, and appropriate null hypothesis
*/
def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = {
val n1 = data1.count().toDouble
val n2 = data2.count().toDouble
// identifier for sample 1, needed after co-sort
val isSample1 = true
// combine identified samples
val unionedData = data1.map((_, isSample1)).union(data2.map((_, !isSample1)))
// co-sort and operate on each partition, returning local extrema to the driver
val localData = unionedData.sortByKey().mapPartitions(
searchTwoSampleCandidates(_, n1, n2)
).collect()
// result: global extreme
val ksStat = searchTwoSampleStatistic(localData, n1 * n2)
evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
}

/**
* Calculates maximum distance candidates and counts of elements from each sample within one
* partition for the two-sample, two-sided Kolmogorov-Smirnov test implementation
* @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs,
* each element is additionally tagged with a boolean flag for sample 1 membership
* @param n1 `Double` sample 1 size
* @param n2 `Double` sample 2 size
* @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum
* distance, the second is an unadjusted maximum distance (both of which will later
* be adjusted by a constant to account for elements in prior partitions), and the third is
* a count corresponding to the numerator of the adjustment constant coming from this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a little clarification in the top level documentation for the class on the approach. I.e. so users can understand what the adjustment constant is, as well as what the denominator is that will be put under the numerator calculated here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks for feedback

* partition. This last value, the numerator of the adjustment constant, is calculated as
* |sample 2| * |sample 1 in partition| - |sample 1| * |sample 2 in partition|. This comes
* from the fact that when we adjust for prior partitions, what we are doing is
* adding the difference of the fractions (|prior elements in sample 1| / |sample 1| -
* |prior elements in sample 2| / |sample 2|). We simply keep track of the numerator
* portion that is attributable to each partition so that following partitions can
* use it to cumulatively adjust their values.
*/
private def searchTwoSampleCandidates(
partData: Iterator[(Double, Boolean)],
n1: Double,
n2: Double): Iterator[(Double, Double, Double)] = {
// fold accumulator: local minimum, local maximum, index for sample 1, index for sample2
case class ExtremaAndIndices(min: Double, max: Double, ix1: Int, ix2: Int)
val initAcc = ExtremaAndIndices(Double.MaxValue, Double.MinValue, 0, 0)
// traverse the data in the partition and calculate distances and counts
val pResults = partData.foldLeft(initAcc) { case (acc, (v, isSample1)) =>
val (add1, add2) = if (isSample1) (1, 0) else (0, 1)
val cdf1 = (acc.ix1 + add1) / n1
val cdf2 = (acc.ix2 + add2) / n2
val dist = cdf1 - cdf2
ExtremaAndIndices(
math.min(acc.min, dist),
math.max(acc.max, dist),
acc.ix1 + add1, acc.ix2 + add2)
}
val results = if (pResults == initAcc) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do things go bad if we don't special case here? Or it's just less efficient?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things go bad. The accumulator for the fold has Double.MaxValue and Double.MinValue, to allow proper accumulator of minimums and maximums, respectively. If the partition is empty, the foldLeft still returns the accumulator. So if we don't check and replace with an empty array (on line 247), then the statistic is ruined by the max double.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide a comment on what situations we would expect to hit this case in?

Array[(Double, Double, Double)]()
} else {
Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 - (pResults.ix2 + 1) * n1))
}
results.iterator
}

/**
* Adjust candidate extremes by the appropriate constant. The resulting maximum corresponds to
* the two-sample, two-sided Kolmogorov-Smirnov test
* @param localData `Array[(Double, Double, Double)]` contains the candidate extremes from each
* partition, along with the numerator for the necessary constant adjustments
* @param n `Double` The denominator in the constant adjustment (i.e. (size of sample 1 ) * (size
* of sample 2))
* @return The two-sample, two-sided Kolmogorov-Smirnov statistic
*/
private def searchTwoSampleStatistic(localData: Array[(Double, Double, Double)], n: Double)
: Double = {
// maximum distance and numerator for constant adjustment
val initAcc = (Double.MinValue, 0.0)
// adjust differences based on the number of elements preceding it, which should provide
// the correct distance between the 2 empirical CDFs
val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) =>
val adjConst = prevCt / n
val dist1 = math.abs(minCand + adjConst)
val dist2 = math.abs(maxCand + adjConst)
val maxVal = Array(prevMax, dist1, dist2).max
(maxVal, prevCt + ct)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent this (and the contents of the block) back a step

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent this back two spaces

results._1
}

private def evalTwoSampleP(ksStat: Double, n: Int, m: Int): KolmogorovSmirnovTestResult = {
val pval = new CommonMathKolmogorovSmirnovTest().approximateP(ksStat, n, m)
new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.TwoSampleTwoSided.toString)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Random

import org.apache.commons.math3.distribution.{ExponentialDistribution,
NormalDistribution, UniformRealDistribution}
import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => CommonMathKolmogorovSmirnovTest}

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors}
Expand Down Expand Up @@ -177,7 +177,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext {
val sampledUnif = sc.parallelize(unifDist.sample(n), 10)

// Use a apache math commons local KS test to verify calculations
val ksTest = new KolmogorovSmirnovTest()
val ksTest = new CommonMathKolmogorovSmirnovTest()
val pThreshold = 0.05

// Comparing a standard normal sample to a standard normal distribution
Expand Down Expand Up @@ -254,4 +254,150 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(rCompResult.statistic ~== rKSStat relTol 1e-4)
assert(rCompResult.pValue ~== rKSPVal relTol 1e-4)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there weird edge cases here with the partitioning? E.g. where a partition has no elements? Or only elements from one sample? Can we provide tests for them?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The edge case of partitions with no elements does indeed exist, which is why we have that odd check for whether a result after folding in a partition matches the initial accumulator. The test involving nonOverlap1P and nonOverlap2P should test for situations where a partition has only 1 sample. I'll break these out into separate tests, since they probably deserve to be singled out.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to test for this than using this round-about method? Particularly since the functions that would likely run into an issue (e.g. KolmogorovSmirnovTest.searchTwoSampleCandidates are currently private). Or should I expose them (just make them package private) instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If making the methods package private would simplify the test, I think it's reasonable to do so

test("2 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") {
// Create theoretical distributions
val stdNormalDist = new NormalDistribution(0, 1)
val normalDist = new NormalDistribution(2, 3)
val expDist = new ExponentialDistribution(0.6)

// create data samples and parallelize
val n = 10000
// local copies
val sampledStdNorm1L = stdNormalDist.sample(n)
val sampledStdNorm2L = stdNormalDist.sample(n)
val sampledNormL = normalDist.sample(n)
val sampledExpL = expDist.sample(n)
// distributed
val sampledStdNorm1P = sc.parallelize(sampledStdNorm1L, 10)
val sampledStdNorm2P = sc.parallelize(sampledStdNorm2L, 10)
val sampledNormP = sc.parallelize(sampledNormL, 10)
val sampledExpP = sc.parallelize(sampledExpL, 10)

// Use apache math commons local KS test to verify calculations
val ksTest = new CommonMathKolmogorovSmirnovTest()
val pThreshold = 0.05

// Comparing 2 samples from same standard normal distribution
val result1 = Statistics.kolmogorovSmirnovTest2Sample(sampledStdNorm1P, sampledStdNorm2P)
val refStat1 = ksTest.kolmogorovSmirnovStatistic(sampledStdNorm1L, sampledStdNorm2L)
val refP1 = ksTest.kolmogorovSmirnovTest(sampledStdNorm1L, sampledStdNorm2L)
assert(result1.statistic ~== refStat1 relTol 1e-4)
assert(result1.pValue ~== refP1 relTol 1e-4)
assert(result1.pValue > pThreshold) // accept H0

// Comparing 2 samples from different normal distributions
val result2 = Statistics.kolmogorovSmirnovTest2Sample(sampledStdNorm1P, sampledNormP)
val refStat2 = ksTest.kolmogorovSmirnovStatistic(sampledStdNorm1L, sampledNormL)
val refP2 = ksTest.kolmogorovSmirnovTest(sampledStdNorm1L, sampledNormL)
assert(result2.statistic ~== refStat2 relTol 1e-4)
assert(result2.pValue ~== refP2 relTol 1e-4)
assert(result2.pValue < pThreshold) // reject H0

// Comparing 1 sample from normal distribution to 1 sample from exponential distribution
val result3 = Statistics.kolmogorovSmirnovTest2Sample(sampledNormP, sampledExpP)
val refStat3 = ksTest.kolmogorovSmirnovStatistic(sampledNormL, sampledExpL)
val refP3 = ksTest.kolmogorovSmirnovTest(sampledNormL, sampledExpL)
assert(result3.statistic ~== refStat3 relTol 1e-4)
assert(result3.pValue ~== refP3 relTol 1e-4)
assert(result3.pValue < pThreshold) // reject H0
}

test("2 sample Kolmogorov-Smirnov test: R implementation equivalence") {
/*
Comparing results with the R implementation of KS
> sessionInfo()
R version 3.2.0 (2015-04-16)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
> set.seed(20)
> v <- rnorm(20)
> v2 <- rnorm(20)
> v
[1] 1.16268529 -0.58592447 1.78546500 -1.33259371 -0.44656677 0.56960612
[7] -2.88971761 -0.86901834 -0.46170268 -0.55554091 -0.02013537 -0.15038222
[13] -0.62812676 1.32322085 -1.52135057 -0.43742787 0.97057758 0.02822264
[19] -0.08578219 0.38921440
> v2
[1] 0.23668737 -0.14444023 0.72222970 0.36990686 -0.24206631 -1.47206332
[7] -0.59615955 -1.14670013 -2.47463643 -0.61350858 -0.21631151 1.59014577
[13] 1.55614328 1.10845089 -1.09734184 -1.86060572 -0.91357885 1.24556891
[19] 0.08785472 0.42348190
*/
val rData1 = sc.parallelize(
Array(
1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501,
-0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555,
-0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063,
-0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691,
0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942
)
)

val rData2 = sc.parallelize(
Array(
0.236687367712904, -0.144440226694072, 0.722229700806146, 0.369906857410192,
-0.242066314481781, -1.47206331842053, -0.596159545765696, -1.1467001312186,
-2.47463643305885, -0.613508578410268, -0.216311514038102, 1.5901457684867,
1.55614327565194, 1.10845089348356, -1.09734184488477, -1.86060571637755,
-0.913578847977252, 1.24556891198713, 0.0878547183607045, 0.423481895050245
)
)

val rKSStat = 0.15
val rKSPval = 0.9831
val kSCompResult = Statistics.kolmogorovSmirnovTest2Sample(rData1, rData2)
assert(kSCompResult.statistic ~== rKSStat relTol 1e-4)
// we're more lenient with the p-value here since the approximate p-value calculated
// by apache math commons is likely to be slightly off given the small sample size
assert(kSCompResult.pValue ~== rKSPval relTol 1e-2)
}

test("2 sample Kolmogorov-Smirnov test: partitions with no data") {
// we use the R data provided in the prior test
// We request a number of partitions larger than the number of elements in the data sets
// wich
val rData1 = sc.parallelize(
Array(
1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501,
-0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555,
-0.461702683149641, -0.555540910137444, -0.0201353678515895, -0.150382224136063,
-0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691,
0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942
), 40)

val rData2 = sc.parallelize(
Array(
0.236687367712904, -0.144440226694072, 0.722229700806146, 0.369906857410192,
-0.242066314481781, -1.47206331842053, -0.596159545765696, -1.1467001312186,
-2.47463643305885, -0.613508578410268, -0.216311514038102, 1.5901457684867,
1.55614327565194, 1.10845089348356, -1.09734184488477, -1.86060571637755,
-0.913578847977252, 1.24556891198713, 0.0878547183607045, 0.423481895050245
), 40)

val rKSStat = 0.15
val rKSPval = 0.9831
val kSCompResult = Statistics.kolmogorovSmirnovTest2Sample(rData1, rData2)
assert(kSCompResult.statistic ~== rKSStat relTol 1e-4)
}

test("2 sample Kolmogorov-Smirnov test: partitions with just data from one sample") {
// Creating 2 samples that don't overlap, so we are guaranteed to have some partitions
// that only include values from sample 1 and some that only include values from sample 2
val n = 1000
val nonOverlap1L = (1 to n).toArray.map(_.toDouble)
val nonOverlap2L = (n + 1 to 2 * n).toArray.map(_.toDouble)
val nonOverlap1P = sc.parallelize(nonOverlap1L, 20)
val nonOverlap2P = sc.parallelize(nonOverlap2L, 20)

// Use apache math commons local KS test to verify calculations
val ksTest = new CommonMathKolmogorovSmirnovTest()
val pThreshold = 0.05

val result4 = Statistics.kolmogorovSmirnovTest2Sample(nonOverlap1P, nonOverlap2P)
val refStat4 = ksTest.kolmogorovSmirnovStatistic(nonOverlap1L, nonOverlap2L)
val refP4 = ksTest.kolmogorovSmirnovTest(nonOverlap1L, nonOverlap2L)
assert(result4.statistic ~== refStat4 relTol 1e-3)
assert(result4.pValue ~== refP4 relTol 1e-3)
assert(result4.pValue < pThreshold) // reject H0
}
}