From e129e063f3fc90c14af534e9f4b8b731dfc4fa33 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 31 Mar 2017 09:36:58 +0000 Subject: [PATCH 1/7] Add Python interface for ml.stats.Correlation. --- .../apache/spark/ml/stat/Correlation.scala | 2 +- python/pyspark/ml/stat.py | 56 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala index a7243ccbf28cc..529b457f06107 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -56,7 +56,7 @@ object Correlation { * Here is how to access the correlation coefficient: * {{{ * val data: Dataset[Vector] = ... - * val Row(coeff: Matrix) = Statistics.corr(data, "value").head + * val Row(coeff: Matrix) = Correlation.corr(data, "value").head * // coeff now contains the Pearson correlation matrix. * }}} * diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py index db043ff68feca..42884cd9412eb 100644 --- a/python/pyspark/ml/stat.py +++ b/python/pyspark/ml/stat.py @@ -71,6 +71,62 @@ def test(dataset, featuresCol, labelCol): return _java2py(sc, javaTestObj.test(*args)) +class Correlation(object): + """ + .. note:: Experimental + + Compute the correlation matrix for the input dataset of Vectors using the specified method. + Methods currently supported: `pearson` (default), `spearman`. + + :param dataset: + A dataset or a dataframe. + :param column: + The name of the column of vectors for which the correlation coefficient needs + to be computed. This must be a column of the dataset, and it must contain + Vector objects. + :param method: + String specifying the method to use for computing correlation. + Supported: `pearson` (default), `spearman`. + :return: + A dataframe that contains the correlation matrix of the column of vectors. This + dataframe contains a single row and a single column of name + '$METHODNAME($COLUMN)'. + + >>> from pyspark.ml.linalg import Vectors + >>> from pyspark.ml.stat import Correlation + >>> dataset = [[Vectors.dense([1, 0, 0, -2])], + ... [Vectors.dense([4, 5, 0, 3])], + ... [Vectors.dense([6, 7, 0, 8])], + ... [Vectors.dense([9, 0, 0, 1])]] + >>> dataset = spark.createDataFrame(dataset, ["features"]) + >>> pearsonCorr = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0] + >>> print(str(pearsonCorr).replace('nan', 'NaN')) + DenseMatrix([[ 1. , 0.05564149, NaN, 0.40047142], + [ 0.05564149, 1. , NaN, 0.91359586], + [ NaN, NaN, 1. , NaN], + [ 0.40047142, 0.91359586, NaN, 1. ]]) + >>> spearmanCorr = Correlation.corr(dataset, 'features', method="spearman").collect()[0][0] + >>> print(str(spearmanCorr).replace('nan', 'NaN')) + DenseMatrix([[ 1. , 0.10540926, NaN, 0.4 ], + [ 0.10540926, 1. , NaN, 0.9486833 ], + [ NaN, NaN, 1. , NaN], + [ 0.4 , 0.9486833 , NaN, 1. ]]) + + .. versionadded:: 2.2.0 + + """ + @staticmethod + @since("2.2.0") + def corr(dataset, column, method="pearson"): + """ + Compute the correlation matrix with specified method using dataset. + """ + sc = SparkContext._active_spark_context + javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation + args = [_py2java(sc, arg) for arg in (dataset, column, method)] + return _java2py(sc, javaCorrObj.corr(*args)) + + if __name__ == "__main__": import doctest import pyspark.ml.stat From a684ac82e38fe01f68c98fff0c17a9b63dbead45 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 31 Mar 2017 11:51:52 +0000 Subject: [PATCH 2/7] Address comment. --- .../src/main/scala/org/apache/spark/ml/stat/Correlation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala index 529b457f06107..1e1baf5fd4336 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -62,8 +62,8 @@ object Correlation { * * @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column * and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], - * which is fairly costly. Cache the input RDD before calling corr with `method = "spearman"` to - * avoid recomputing the common lineage. + * which is fairly costly. Cache the input Dataset before calling corr with `method = "spearman"` + * to avoid recomputing the common lineage. */ @Since("2.2.0") def corr(dataset: Dataset[_], column: String, method: String): DataFrame = { From fbcc1fe1c8e2652dc54c2ebfacce01a3f69449a2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 4 Apr 2017 00:44:27 +0000 Subject: [PATCH 3/7] Address comments. --- .../scala/org/apache/spark/ml/stat/Correlation.scala | 2 +- python/pyspark/ml/stat.py | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala index 1e1baf5fd4336..bce7348964574 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala @@ -38,7 +38,7 @@ object Correlation { /** * :: Experimental :: - * Compute the correlation matrix for the input RDD of Vectors using the specified method. + * Compute the correlation matrix for the input Dataset of Vectors using the specified method. * Methods currently supported: `pearson` (default), `spearman`. * * @param dataset A dataset or a dataframe diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py index 42884cd9412eb..960116e2eb99d 100644 --- a/python/pyspark/ml/stat.py +++ b/python/pyspark/ml/stat.py @@ -78,6 +78,11 @@ class Correlation(object): Compute the correlation matrix for the input dataset of Vectors using the specified method. Methods currently supported: `pearson` (default), `spearman`. + @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column + and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], + which is fairly costly. Cache the input Dataset before calling corr with `method = 'spearman'` + to avoid recomputing the common lineage. + :param dataset: A dataset or a dataframe. :param column: @@ -96,16 +101,16 @@ class Correlation(object): >>> from pyspark.ml.stat import Correlation >>> dataset = [[Vectors.dense([1, 0, 0, -2])], ... [Vectors.dense([4, 5, 0, 3])], - ... [Vectors.dense([6, 7, 0, 8])], + ... [Vectors.dense([6, 7, 0, 8])], ... [Vectors.dense([9, 0, 0, 1])]] - >>> dataset = spark.createDataFrame(dataset, ["features"]) + >>> dataset = spark.createDataFrame(dataset, ['features']) >>> pearsonCorr = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0] >>> print(str(pearsonCorr).replace('nan', 'NaN')) DenseMatrix([[ 1. , 0.05564149, NaN, 0.40047142], [ 0.05564149, 1. , NaN, 0.91359586], [ NaN, NaN, 1. , NaN], [ 0.40047142, 0.91359586, NaN, 1. ]]) - >>> spearmanCorr = Correlation.corr(dataset, 'features', method="spearman").collect()[0][0] + >>> spearmanCorr = Correlation.corr(dataset, 'features', method='spearman').collect()[0][0] >>> print(str(spearmanCorr).replace('nan', 'NaN')) DenseMatrix([[ 1. , 0.10540926, NaN, 0.4 ], [ 0.10540926, 1. , NaN, 0.9486833 ], From 5d9d70fbe225899e8a53a5cb2c116350236d0230 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 6 Apr 2017 07:48:41 +0000 Subject: [PATCH 4/7] Replace @note. --- python/pyspark/ml/stat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py index 960116e2eb99d..ca8f592ac4e77 100644 --- a/python/pyspark/ml/stat.py +++ b/python/pyspark/ml/stat.py @@ -78,7 +78,7 @@ class Correlation(object): Compute the correlation matrix for the input dataset of Vectors using the specified method. Methods currently supported: `pearson` (default), `spearman`. - @note For Spearman, a rank correlation, we need to create an RDD[Double] for each column + Notice: For Spearman, a rank correlation, we need to create an RDD[Double] for each column and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], which is fairly costly. Cache the input Dataset before calling corr with `method = 'spearman'` to avoid recomputing the common lineage. From fd76901c39c24be48dda970f5e4625f839ee02ed Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 6 Apr 2017 08:30:35 +0000 Subject: [PATCH 5/7] Fix doc. --- python/pyspark/ml/stat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py index ca8f592ac4e77..2e18e69756526 100644 --- a/python/pyspark/ml/stat.py +++ b/python/pyspark/ml/stat.py @@ -78,7 +78,7 @@ class Correlation(object): Compute the correlation matrix for the input dataset of Vectors using the specified method. Methods currently supported: `pearson` (default), `spearman`. - Notice: For Spearman, a rank correlation, we need to create an RDD[Double] for each column + .. note:: For Spearman, a rank correlation, we need to create an RDD[Double] for each column and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], which is fairly costly. Cache the input Dataset before calling corr with `method = 'spearman'` to avoid recomputing the common lineage. From 5d043264103e297c52d173ac9b84b7b89833cceb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 6 Apr 2017 08:38:51 +0000 Subject: [PATCH 6/7] Fix doc. --- python/pyspark/ml/stat.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py index 2e18e69756526..2761be4853e60 100644 --- a/python/pyspark/ml/stat.py +++ b/python/pyspark/ml/stat.py @@ -79,9 +79,9 @@ class Correlation(object): Methods currently supported: `pearson` (default), `spearman`. .. note:: For Spearman, a rank correlation, we need to create an RDD[Double] for each column - and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], - which is fairly costly. Cache the input Dataset before calling corr with `method = 'spearman'` - to avoid recomputing the common lineage. + and sort it in order to retrieve the ranks and then join the columns back into an RDD[Vector], + which is fairly costly. Cache the input Dataset before calling corr with `method = 'spearman'` + to avoid recomputing the common lineage. :param dataset: A dataset or a dataframe. From 601d9ebd3cf1f427e8b8859b921511ff839747ea Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 6 Apr 2017 13:33:18 +0000 Subject: [PATCH 7/7] Be cautious to avoid possible flaky test. --- python/pyspark/ml/stat.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py index 2761be4853e60..079b0833e1c6d 100644 --- a/python/pyspark/ml/stat.py +++ b/python/pyspark/ml/stat.py @@ -106,16 +106,16 @@ class Correlation(object): >>> dataset = spark.createDataFrame(dataset, ['features']) >>> pearsonCorr = Correlation.corr(dataset, 'features', 'pearson').collect()[0][0] >>> print(str(pearsonCorr).replace('nan', 'NaN')) - DenseMatrix([[ 1. , 0.05564149, NaN, 0.40047142], - [ 0.05564149, 1. , NaN, 0.91359586], + DenseMatrix([[ 1. , 0.0556..., NaN, 0.4004...], + [ 0.0556..., 1. , NaN, 0.9135...], [ NaN, NaN, 1. , NaN], - [ 0.40047142, 0.91359586, NaN, 1. ]]) + [ 0.4004..., 0.9135..., NaN, 1. ]]) >>> spearmanCorr = Correlation.corr(dataset, 'features', method='spearman').collect()[0][0] >>> print(str(spearmanCorr).replace('nan', 'NaN')) - DenseMatrix([[ 1. , 0.10540926, NaN, 0.4 ], - [ 0.10540926, 1. , NaN, 0.9486833 ], + DenseMatrix([[ 1. , 0.1054..., NaN, 0.4 ], + [ 0.1054..., 1. , NaN, 0.9486... ], [ NaN, NaN, 1. , NaN], - [ 0.4 , 0.9486833 , NaN, 1. ]]) + [ 0.4 , 0.9486... , NaN, 1. ]]) .. versionadded:: 2.2.0