From 09dda1b23098c80d10685d427febbea55e8bc31e Mon Sep 17 00:00:00 2001 From: Henry D Date: Fri, 21 Jun 2019 16:22:47 -0700 Subject: [PATCH 1/5] Accept DataFrames in RowMatrix and IndexedRowMatrix constructors --- .../mllib/api/python/PythonMLLibAPI.scala | 14 ++++++++--- .../mllib/linalg/distributed/RowMatrix.scala | 5 ++-- python/pyspark/mllib/linalg/distributed.py | 13 ++++++---- python/pyspark/mllib/tests/test_linalg.py | 24 +++++++++++++++++++ 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 322ef93473da..79c58caaabbc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -26,7 +26,6 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import net.razorvine.pickle._ - import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.api.python.SerDeUtil import org.apache.spark.mllib.classification._ @@ -48,10 +47,10 @@ import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomFo import org.apache.spark.mllib.tree.configuration.{Algo, BoostingStrategy, Strategy} import org.apache.spark.mllib.tree.impurity._ import org.apache.spark.mllib.tree.loss.Losses -import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTreesModel, - RandomForestModel} +import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTreesModel, RandomForestModel} import org.apache.spark.mllib.util.{LinearDataGenerator, MLUtils} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.LongType import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -1142,12 +1141,21 @@ private[python] class PythonMLLibAPI extends Serializable { new RowMatrix(rows.rdd, numRows, numCols) } + def createRowMatrix(df: DataFrame, numRows: Long, numCols: Int): RowMatrix = { + require(df.schema.length == 1 && df.schema.head.dataType.getClass == classOf[VectorUDT], + "DataFrame must have a single vector type column") + new RowMatrix(df.rdd.map { case Row(vector: Vector) => vector }, numRows, numCols) + } + /** * Wrapper around IndexedRowMatrix constructor. */ def createIndexedRowMatrix(rows: DataFrame, numRows: Long, numCols: Int): IndexedRowMatrix = { // We use DataFrames for serialization of IndexedRows from Python, // so map each Row in the DataFrame back to an IndexedRow. + require(rows.schema.length == 2 && rows.schema.head.dataType == LongType && + rows.schema(1).dataType.getClass == classOf[VectorUDT], + "DataFrame must consist of a long type index column and a vector type column") val indexedRows = rows.rdd.map { case Row(index: Long, vector: Vector) => IndexedRow(index, vector) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 43f48befd014..e845645a8df4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -21,16 +21,15 @@ import java.util.Arrays import scala.collection.mutable.ListBuffer -import breeze.linalg.{axpy => brzAxpy, inv, svd => brzSvd, DenseMatrix => BDM, DenseVector => BDV, - MatrixSingularException, SparseVector => BSV} +import breeze.linalg.{MatrixSingularException, inv, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV, axpy => brzAxpy, svd => brzSvd} import breeze.numerics.{sqrt => brzSqrt} - import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.internal.config.MAX_RESULT_SIZE import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.random.XORShiftRandom diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py index b7f09782be9d..6ea1d579d657 100644 --- a/python/pyspark/mllib/linalg/distributed.py +++ b/python/pyspark/mllib/linalg/distributed.py @@ -30,6 +30,7 @@ from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper from pyspark.mllib.linalg import _convert_to_vector, DenseMatrix, Matrix, QRDecomposition from pyspark.mllib.stat import MultivariateStatisticalSummary +from pyspark.sql import DataFrame from pyspark.storagelevel import StorageLevel @@ -57,7 +58,7 @@ class RowMatrix(DistributedMatrix): Represents a row-oriented distributed Matrix with no meaningful row indices. - :param rows: An RDD of vectors. + :param rows: An RDD or DataFrame of vectors. :param numRows: Number of rows in the matrix. A non-positive value means unknown, at which point the number of rows will be determined by the number of @@ -73,7 +74,7 @@ def __init__(self, rows, numRows=0, numCols=0): Create a wrapper over a Java RowMatrix. - Publicly, we require that `rows` be an RDD. However, for + Publicly, we require that `rows` be an RDD or DataFrame. However, for internal usage, `rows` can also be a Java RowMatrix object, in which case we can wrap it directly. This assists in clean matrix conversions. @@ -94,6 +95,8 @@ def __init__(self, rows, numRows=0, numCols=0): if isinstance(rows, RDD): rows = rows.map(_convert_to_vector) java_matrix = callMLlibFunc("createRowMatrix", rows, long(numRows), int(numCols)) + elif isinstance(rows, DataFrame): + java_matrix = callMLlibFunc("createRowMatrix", rows, long(numRows), int(numCols)) elif (isinstance(rows, JavaObject) and rows.getClass().getSimpleName() == "RowMatrix"): java_matrix = rows @@ -461,7 +464,7 @@ class IndexedRowMatrix(DistributedMatrix): """ Represents a row-oriented distributed Matrix with indexed rows. - :param rows: An RDD of IndexedRows or (long, vector) tuples. + :param rows: An RDD or DataFrame of IndexedRows or (long, vector) tuples. :param numRows: Number of rows in the matrix. A non-positive value means unknown, at which point the number of rows will be determined by the max row @@ -477,7 +480,7 @@ def __init__(self, rows, numRows=0, numCols=0): Create a wrapper over a Java IndexedRowMatrix. - Publicly, we require that `rows` be an RDD. However, for + Publicly, we require that `rows` be an RDD or DataFrame. However, for internal usage, `rows` can also be a Java IndexedRowMatrix object, in which case we can wrap it directly. This assists in clean matrix conversions. @@ -506,6 +509,8 @@ def __init__(self, rows, numRows=0, numCols=0): # IndexedRows on the Scala side. java_matrix = callMLlibFunc("createIndexedRowMatrix", rows.toDF(), long(numRows), int(numCols)) + elif isinstance(rows, DataFrame): + java_matrix = callMLlibFunc("createIndexedRowMatrix", rows, long(numRows), int(numCols)) elif (isinstance(rows, JavaObject) and rows.getClass().getSimpleName() == "IndexedRowMatrix"): java_matrix = rows diff --git a/python/pyspark/mllib/tests/test_linalg.py b/python/pyspark/mllib/tests/test_linalg.py index 703aed2fe16a..35b071808a3f 100644 --- a/python/pyspark/mllib/tests/test_linalg.py +++ b/python/pyspark/mllib/tests/test_linalg.py @@ -25,10 +25,15 @@ from pyspark.serializers import PickleSerializer from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector, \ DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT +from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix from pyspark.mllib.regression import LabeledPoint +from pyspark.sql import Row from pyspark.testing.mllibutils import MLlibTestCase from pyspark.testing.utils import have_scipy +if sys.version >= '3': + long = int + class VectorTests(MLlibTestCase): @@ -431,6 +436,24 @@ def test_infer_schema(self): else: raise TypeError("expecting a vector but got %r of type %r" % (v, type(v))) + def test_row_matrix_from_dataframe(self): + from pyspark.sql.utils import IllegalArgumentException + df = self.spark.createDataFrame([Row(Vectors.dense(1))]) + row_matrix = RowMatrix(df) + self.assertEqual(row_matrix.numRows(), 1) + self.assertEqual(row_matrix.numCols(), 1) + with self.assertRaises(IllegalArgumentException): + RowMatrix(df.selectExpr("'monkey'")) + + def test_indexed_row_matrix_from_dataframe(self): + from pyspark.sql.utils import IllegalArgumentException + df = self.spark.createDataFrame([Row(long(0), Vectors.dense(1))]) + matrix = IndexedRowMatrix(df) + self.assertEqual(matrix.numRows(), 1) + self.assertEqual(matrix.numCols(), 1) + with self.assertRaises(IllegalArgumentException): + IndexedRowMatrix(df.drop("_1")) + class MatrixUDTTests(MLlibTestCase): @@ -618,6 +641,7 @@ def test_regression(self): self.assertTrue(dt_model.predict(features[3]) > 0) + if __name__ == "__main__": from pyspark.mllib.tests.test_linalg import * From 9c55664ca75e17a3c1dee0333db894659606f58a Mon Sep 17 00:00:00 2001 From: Henry D Date: Mon, 24 Jun 2019 11:32:15 -0700 Subject: [PATCH 2/5] improve docs a little --- python/pyspark/mllib/linalg/distributed.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/linalg/distributed.py b/python/pyspark/mllib/linalg/distributed.py index 6ea1d579d657..56701758c89c 100644 --- a/python/pyspark/mllib/linalg/distributed.py +++ b/python/pyspark/mllib/linalg/distributed.py @@ -58,7 +58,8 @@ class RowMatrix(DistributedMatrix): Represents a row-oriented distributed Matrix with no meaningful row indices. - :param rows: An RDD or DataFrame of vectors. + :param rows: An RDD or DataFrame of vectors. If a DataFrame is provided, it must have a single + vector typed column. :param numRows: Number of rows in the matrix. A non-positive value means unknown, at which point the number of rows will be determined by the number of @@ -464,7 +465,8 @@ class IndexedRowMatrix(DistributedMatrix): """ Represents a row-oriented distributed Matrix with indexed rows. - :param rows: An RDD or DataFrame of IndexedRows or (long, vector) tuples. + :param rows: An RDD of IndexedRows or (long, vector) tuples or a DataFrame consisting of a + long typed column of indices and a vector typed column. :param numRows: Number of rows in the matrix. A non-positive value means unknown, at which point the number of rows will be determined by the max row From 7f941e97004df1da25512223e1f2fc84dfd6caf2 Mon Sep 17 00:00:00 2001 From: Henry D Date: Mon, 24 Jun 2019 11:41:28 -0700 Subject: [PATCH 3/5] style --- python/pyspark/mllib/tests/test_linalg.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/mllib/tests/test_linalg.py b/python/pyspark/mllib/tests/test_linalg.py index 35b071808a3f..588fc6259317 100644 --- a/python/pyspark/mllib/tests/test_linalg.py +++ b/python/pyspark/mllib/tests/test_linalg.py @@ -641,7 +641,6 @@ def test_regression(self): self.assertTrue(dt_model.predict(features[3]) > 0) - if __name__ == "__main__": from pyspark.mllib.tests.test_linalg import * From d620b43c50866ea733ff184a7fc43faba867dcff Mon Sep 17 00:00:00 2001 From: Henry D Date: Sun, 7 Jul 2019 21:28:55 -0700 Subject: [PATCH 4/5] undo import changes --- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 4 +++- .../apache/spark/mllib/linalg/distributed/RowMatrix.scala | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 79c58caaabbc..26484a323835 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import net.razorvine.pickle._ + import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.api.python.SerDeUtil import org.apache.spark.mllib.classification._ @@ -47,7 +48,8 @@ import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomFo import org.apache.spark.mllib.tree.configuration.{Algo, BoostingStrategy, Strategy} import org.apache.spark.mllib.tree.impurity._ import org.apache.spark.mllib.tree.loss.Losses -import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTreesModel, RandomForestModel} +import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTreesModel, + RandomForestModel} import org.apache.spark.mllib.util.{LinearDataGenerator, MLUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.LongType diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index e845645a8df4..6d4ded3cd8c1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -21,15 +21,16 @@ import java.util.Arrays import scala.collection.mutable.ListBuffer -import breeze.linalg.{MatrixSingularException, inv, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV, axpy => brzAxpy, svd => brzSvd} +import breeze.linalg.{MatrixSingularException, inv, DenseMatrix => BDM, DenseVector => BDV, + SparseVector => BSV, axpy => brzAxpy, svd => brzSvd} import breeze.numerics.{sqrt => brzSqrt} + import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.internal.config.MAX_RESULT_SIZE import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.random.XORShiftRandom From 4a40143a73739aa1d6ee3d9f6209c0137c024da9 Mon Sep 17 00:00:00 2001 From: Henry D Date: Mon, 8 Jul 2019 12:54:45 -0700 Subject: [PATCH 5/5] import order --- .../org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 2 +- .../org/apache/spark/mllib/linalg/distributed/RowMatrix.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 26484a323835..4c478a5477c0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -52,8 +52,8 @@ import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTree RandomForestModel} import org.apache.spark.mllib.util.{LinearDataGenerator, MLUtils} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.LongType import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types.LongType import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 6d4ded3cd8c1..43f48befd014 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -21,8 +21,8 @@ import java.util.Arrays import scala.collection.mutable.ListBuffer -import breeze.linalg.{MatrixSingularException, inv, DenseMatrix => BDM, DenseVector => BDV, - SparseVector => BSV, axpy => brzAxpy, svd => brzSvd} +import breeze.linalg.{axpy => brzAxpy, inv, svd => brzSvd, DenseMatrix => BDM, DenseVector => BDV, + MatrixSingularException, SparseVector => BSV} import breeze.numerics.{sqrt => brzSqrt} import org.apache.spark.annotation.Since