Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/mllib-statistics.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,46 @@ for (ChiSqTestResult result : featureTestResults) {
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
[`Statistics`](api/python/index.html#pyspark.mllib.stat.Statistics$) provides methods to
run Pearson's chi-squared tests. The following example demonstrates how to run and interpret
hypothesis tests.

{% highlight python %}
from pyspark import SparkContext
from pyspark.mllib.linalg import Vectors, Matrices
from pyspark.mllib.regresssion import LabeledPoint
from pyspark.mllib.stat import Statistics

sc = SparkContext()

vec = Vectors.dense(...) # a vector composed of the frequencies of events

# compute the goodness of fit. If a second vector to test against is not supplied as a parameter,
# the test runs against a uniform distribution.
goodnessOfFitTestResult = Statistics.chiSqTest(vec)
print goodnessOfFitTestResult # summary of the test including the p-value, degrees of freedom,
# test statistic, the method used, and the null hypothesis.

mat = Matrices.dense(...) # a contingency matrix

# conduct Pearson's independence test on the input contingency matrix
independenceTestResult = Statistics.chiSqTest(mat)
print independenceTestResult # summary of the test including the p-value, degrees of freedom...

obs = sc.parallelize(...) # LabeledPoint(feature, label) .

# The contingency table is constructed from an RDD of LabeledPoint and used to conduct
# the independence test. Returns an array containing the ChiSquaredTestResult for every feature
# against the label.
featureTestResults = Statistics.chiSqTest(obs)

for i, result in enumerate(featureTestResults):
print "Column $d:" % (i + 1)
print result
{% endhighlight %}
</div>

</div>

## Random data generation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.mllib.tree.impurity._
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.stat.correlation.CorrelationNames
import org.apache.spark.mllib.stat.test.ChiSqTestResult
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -454,6 +455,33 @@ class PythonMLLibAPI extends Serializable {
Statistics.corr(x.rdd, y.rdd, getCorrNameOrDefault(method))
}

/**
* Java stub for mllib Statistics.chiSqTest()
*/
def chiSqTest(observed: Vector, expected: Vector): ChiSqTestResult = {
if (expected == null) {
Statistics.chiSqTest(observed)
} else {
Statistics.chiSqTest(observed, expected)
}
}

/**
* Java stub for mllib Statistics.chiSqTest(observed: Matrix)
* @param observed
* @return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove @param and @return

*/
def chiSqTest(observed: Matrix): ChiSqTestResult = {
Statistics.chiSqTest(observed)
}

/**
* Java stub for mllib Statistics.chiSqTest(RDD[LabelPoint])
*/
def chiSqTest(data: JavaRDD[LabeledPoint]): Array[ChiSqTestResult] = {
Statistics.chiSqTest(data.rdd)
}

// used by the corr methods to retrieve the name of the correlation method passed in via pyspark
private def getCorrNameOrDefault(method: String) = {
if (method == null) CorrelationNames.defaultCorrName else method
Expand Down
7 changes: 6 additions & 1 deletion python/pyspark/mllib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ def _java2py(sc, r):
jrdd = sc._jvm.SerDe.javaToPython(r)
return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))

elif isinstance(r, (JavaArray, JavaList)) or clsName in _picklable_classes:
if clsName in _picklable_classes:
r = sc._jvm.SerDe.dumps(r)
elif isinstance(r, (JavaArray, JavaList)):
try:
r = sc._jvm.SerDe.dumps(r)
except Py4JJavaError:
pass # not pickable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if r is JavaArray or JavaList but not pickleable? Are we expecting that downstream can handle it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller will handle it. The JavaArray/JavaList is iterable in Python, caller can access the internal objects in this array/list.


if isinstance(r, bytearray):
r = PickleSerializer().loads(str(r))
Expand Down
13 changes: 12 additions & 1 deletion python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
IntegerType, ByteType, Row


__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors', 'DenseMatrix', 'Matrices']


if sys.version_info[:2] == (2, 7):
Expand Down Expand Up @@ -578,6 +578,8 @@ class DenseMatrix(Matrix):
def __init__(self, numRows, numCols, values):
Matrix.__init__(self, numRows, numCols)
assert len(values) == numRows * numCols
if not isinstance(values, array.array):
values = array.array('d', values)
self.values = values

def __reduce__(self):
Expand All @@ -596,6 +598,15 @@ def toArray(self):
return np.reshape(self.values, (self.numRows, self.numCols), order='F')


class Matrices(object):
@staticmethod
def dense(numRows, numCols, values):
"""
Create a DenseMatrix
"""
return DenseMatrix(numRows, numCols, values)


def _test():
import doctest
(failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
Expand Down
137 changes: 135 additions & 2 deletions python/pyspark/mllib/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
Python package for statistical functions in MLlib.
"""

from pyspark import RDD
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import _convert_to_vector
from pyspark.mllib.linalg import Matrix, _convert_to_vector


__all__ = ['MultivariateStatisticalSummary', 'Statistics']
__all__ = ['MultivariateStatisticalSummary', 'ChiSqTestResult', 'Statistics']


class MultivariateStatisticalSummary(JavaModelWrapper):
Expand Down Expand Up @@ -51,6 +52,54 @@ def min(self):
return self.call("min").toArray()


class ChiSqTestResult(JavaModelWrapper):
"""
:: Experimental ::

Object containing the test results for the chi-squared hypothesis test.
"""
@property
def method(self):
"""
Name of the test method
"""
return self._java_model.method()

@property
def pValue(self):
"""
The probability of obtaining a test statistic result at least as
extreme as the one that was actually observed, assuming that the
null hypothesis is true.
"""
return self._java_model.pValue()

@property
def degreesOfFreedom(self):
"""
Returns the degree(s) of freedom of the hypothesis test.
Return type should be Number(e.g. Int, Double) or tuples of Numbers.
"""
return self._java_model.degreesOfFreedom()

@property
def statistic(self):
"""
Test statistic.
"""
return self._java_model.statistic()

@property
def nullHypothesis(self):
"""
Null hypothesis of the test.
"""
return self._java_model.nullHypothesis()

def __str__(self):
return self._java_model.toString()


class Statistics(object):

@staticmethod
Expand Down Expand Up @@ -135,6 +184,90 @@ def corr(x, y=None, method=None):
else:
return callMLlibFunc("corr", x.map(float), y.map(float), method)

@staticmethod
def chiSqTest(observed, expected=None):
"""
:: Experimental ::

If `observed` is Vector, conduct Pearson's chi-squared goodness
of fit test of the observed data against the expected distribution,
or againt the uniform distribution (by default), with each category
having an expected frequency of `1 / len(observed)`.
(Note: `observed` cannot contain negative values)

If `observed` is matrix, conduct Pearson's independence test on the
input contingency matrix, which cannot contain negative entries or
columns or rows that sum up to 0.

If `observed` is an RDD of LabeledPoint, conduct Pearson's independence
test for every feature against the label across the input RDD.
For each feature, the (feature, label) pairs are converted into a
contingency matrix for which the chi-squared statistic is computed.
All label and feature values must be categorical.

:param observed: it could be a vector containing the observed categorical
counts/relative frequencies, or the contingency matrix
(containing either counts or relative frequencies),
or an RDD of LabeledPoint containing the labeled dataset
with categorical features. Real-valued features will be
treated as categorical for each distinct value.
:param expected: Vector containing the expected categorical counts/relative
frequencies. `expected` is rescaled if the `expected` sum
differs from the `observed` sum.
:return: ChiSquaredTest object containing the test statistic, degrees
of freedom, p-value, the method used, and the null hypothesis.

>>> from pyspark.mllib.linalg import Vectors, Matrices
>>> observed = Vectors.dense([4, 6, 5])
>>> pearson = Statistics.chiSqTest(observed)
>>> print pearson.statistic
0.4
>>> pearson.degreesOfFreedom
2
>>> print round(pearson.pValue, 4)
0.8187
>>> pearson.method
u'pearson'
>>> pearson.nullHypothesis
u'observed follows the same distribution as expected.'

>>> observed = Vectors.dense([21, 38, 43, 80])
>>> expected = Vectors.dense([3, 5, 7, 20])
>>> pearson = Statistics.chiSqTest(observed, expected)
>>> print round(pearson.pValue, 4)
0.0027

>>> data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
>>> chi = Statistics.chiSqTest(Matrices.dense(3, 4, data))
>>> print round(chi.statistic, 4)
21.9958

>>> from pyspark.mllib.regression import LabeledPoint
>>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
... LabeledPoint(0.0, Vectors.dense([3.5, 30.0])),
... LabeledPoint(0.0, Vectors.dense([3.5, 40.0])),
... LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),]
>>> rdd = sc.parallelize(data, 4)
>>> chi = Statistics.chiSqTest(rdd)
>>> print chi[0].statistic
0.75
>>> print chi[1].statistic
1.5
"""
if isinstance(observed, RDD):
jmodels = callMLlibFunc("chiSqTest", observed)
return [ChiSqTestResult(m) for m in jmodels]

if isinstance(observed, Matrix):
jmodel = callMLlibFunc("chiSqTest", observed)
else:
if expected and len(expected) != len(observed):
raise ValueError("`expected` should have same length with `observed`")
jmodel = callMLlibFunc("chiSqTest", _convert_to_vector(observed), expected)
return ChiSqTestResult(jmodel)


def _test():
import doctest
Expand Down