diff --git a/docs/mllib-data-types.md b/docs/mllib-data-types.md index bba1d5348da28..93f863424456c 100644 --- a/docs/mllib-data-types.md +++ b/docs/mllib-data-types.md @@ -379,14 +379,14 @@ A [`RowMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.RowMatrix) ca an `RDD` of Vectors. {% highlight python %} -from pyspark.mllib.linalg import DistributedMatrices, Vectors +from pyspark.mllib.linalg import RowMatrix, Vectors # Create an RDD of Vectors. rows = sc.parallelize([Vectors.dense([1, 2, 3]), Vectors.dense([4, 5, 6]), Vectors.dense([7, 8, 9]), Vectors.dense([10, 11, 12])]) # Create a RowMatrix from an RDD[Vector]. -mat = DistributedMatrices.rowMatrix(rows) +mat = RowMatrix(rows) # Get its size. m = mat.numRows() # 4 @@ -468,7 +468,7 @@ created from an `RDD` of IndexedRows, where indices. {% highlight python %} -from pyspark.mllib.linalg import Vectors, DistributedMatrices, IndexedRow, IndexedRowMatrix +from pyspark.mllib.linalg import Vectors, IndexedRow, IndexedRowMatrix # Create an RDD of indexed rows. # - This can be done explicitly with the IndexedRow class: @@ -481,7 +481,7 @@ indexedRows = sc.parallelize([(0, Vectors.dense([1, 2, 3])), (1, Vectors.dense([ (2, Vectors.dense([7, 8, 9])), (3, Vectors.dense([10, 11, 12]))]) # Create an IndexedRowMatrix from an RDD[IndexedRow]. -mat = DistributedMatrices.indexedRowMatrix(indexedRows) +mat = IndexedRowMatrix(indexedRows) # Get its size. m = mat.numRows() # 4 @@ -572,7 +572,7 @@ created from a `RDD` of MatrixEntry entries, where `toRowMatrix`, or to an `IndexedRowMatrix` with sparse rows by calling `toIndexedRowMatrix`. {% highlight python %} -from pyspark.mllib.linalg import DistributedMatrices, MatrixEntry, CoordinateMatrix +from pyspark.mllib.linalg import MatrixEntry, CoordinateMatrix # Create an RDD of coordinate entries. # - This can be done explicitly with the MatrixEntry class: @@ -581,7 +581,7 @@ entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), Matrix entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)]) # Create an CoordinateMatrix from an RDD[MatrixEntry]. -mat = DistributedMatrices.coordinateMatrix(entries) +mat = CoordinateMatrix(entries) # Get its size. m = mat.numRows() # 3 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index d8e2dd3f6ed06..036795254b7dd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -21,7 +21,7 @@ import java.io.OutputStream import java.nio.{ByteBuffer, ByteOrder} import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} -import org.apache.spark.mllib.linalg.distributed.{DistributedMatrices, RowMatrix, IndexedRowMatrix, IndexedRow, +import org.apache.spark.mllib.linalg.distributed.{RowMatrix, IndexedRowMatrix, IndexedRow, MatrixEntry, CoordinateMatrix} import scala.collection.JavaConverters._ @@ -1109,14 +1109,14 @@ private[python] class PythonMLLibAPI extends Serializable { } /** - * Wrapper around DistributedMatrices.rowMatrix factory method. + * Wrapper around RowMatrix. */ def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = { - DistributedMatrices.rowMatrix(rows.rdd, numRows, numCols) + new RowMatrix(rows.rdd, numRows, numCols) } /** - * Wrapper around DistributedMatrices.indexedRowMatrix factory method. + * Wrapper around IndexedRowMatrix. */ def createIndexedRowMatrix(rows: DataFrame, numRows: Long, numCols: Int): IndexedRowMatrix = { // We use DataFrames for serialization of IndexedRows from Python, so map each Row in the @@ -1124,7 +1124,7 @@ private[python] class PythonMLLibAPI extends Serializable { val indexedRows = rows.map { case Row(index: Long, vector: Vector) => IndexedRow(index, vector) } - DistributedMatrices.indexedRowMatrix(indexedRows, numRows, numCols) + new IndexedRowMatrix(indexedRows, numRows, numCols) } /** @@ -1137,7 +1137,7 @@ private[python] class PythonMLLibAPI extends Serializable { } /** - * Wrapper around DistributedMatrices.coordinateMatrix factory method. + * Wrapper around CoordinateMatrix. */ def createCoordinateMatrix(rows: DataFrame, numRows: Long, numCols: Long): CoordinateMatrix = { // We use DataFrames for serialization of MatrixEntry entries from Python, so map each Row in @@ -1145,7 +1145,7 @@ private[python] class PythonMLLibAPI extends Serializable { val entries = rows.map { case Row(i: Long, j: Long, value: Double) => MatrixEntry(i, j, value) } - DistributedMatrices.coordinateMatrix(entries, numRows, numCols) + new CoordinateMatrix(entries, numRows, numCols) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala deleted file mode 100644 index 014d778529487..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/DistributedMatrix.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.linalg.distributed - -import breeze.linalg.{DenseMatrix => BDM} -import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.rdd.RDD - -/** - * Represents a distributively stored matrix backed by one or more RDDs. - */ -trait DistributedMatrix extends Serializable { - - /** Gets or computes the number of rows. */ - def numRows(): Long - - /** Gets or computes the number of columns. */ - def numCols(): Long - - /** Collects data and assembles a local dense breeze matrix (for test only). */ - private[mllib] def toBreeze(): BDM[Double] -} - -/** - * Factory methods for [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]]. - */ -object DistributedMatrices { - - /** - * Creates a Row Matrix. - * - * @param rows A RDD[Vector] - * @param numRows Number of rows in the matrix - * @param numCols Number of columns in the matrix - */ - def rowMatrix(rows: RDD[Vector], numRows: Long = 0, numCols: Int = 0): RowMatrix = { - new RowMatrix(rows, numRows, numCols) - } - - /** - * Creates an IndexedRowMatrix. - * - * @param rows A RDD[IndexedRow] - * @param numRows Number of rows in the matrix - * @param numCols Number of columns in the matrix - */ - def indexedRowMatrix( - rows: RDD[IndexedRow], - numRows: Long = 0, - numCols: Int = 0): IndexedRowMatrix = { - new IndexedRowMatrix(rows, numRows, numCols) - } - - /** - * Creates a CoordinateMatrix. - * - * @param rows A RDD[MatrixEntry] - * @param numRows Number of rows in the matrix - * @param numCols Number of columns in the matrix - */ - def coordinateMatrix( - rows: RDD[MatrixEntry], - numRows: Long = 0, - numCols: Long = 0): CoordinateMatrix = { - new CoordinateMatrix(rows, numRows, numCols) - } -} diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index b50cf814b4b2a..98c230052822f 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -37,15 +37,15 @@ import numpy as np -from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper +from pyspark import RDD +from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper, inherit_doc from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \ IntegerType, ByteType, BooleanType __all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors', 'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices', - 'DistributedMatrix', 'DistributedMatrices', 'RowMatrix', - 'IndexedRow', 'IndexedRowMatrix', 'MatrixEntry', + 'RowMatrix', 'IndexedRow', 'IndexedRowMatrix', 'MatrixEntry', 'CoordinateMatrix'] @@ -1171,77 +1171,22 @@ def numCols(self): raise NotImplementedError -class DistributedMatrices(object): - """Factory methods for distributed matrices.""" - @staticmethod - def rowMatrix(rows, numRows=0, numCols=0): - """ - Create a RowMatrix. +@inherit_doc +class RowMatrixModel(DistributedMatrix, JavaModelWrapper): - :param rows: An RDD of Vectors. - """ - javaRowMatrix = callMLlibFunc("createRowMatrix", rows, long(numRows), int(numCols)) - jrm = JavaModelWrapper(javaRowMatrix) - return RowMatrix(jrm) - - @staticmethod - def indexedRowMatrix(rows, numRows=0, numCols=0): - """ - Create an IndexedRowMatrix. - - :param rows: An RDD of IndexedRows or (long, Vector) tuples. - """ - # We use DataFrames for serialization of IndexedRows from Python, so convert the RDD to a - # DataFrame. This will convert each IndexedRow to a Row containing the 'index' and 'vector' - # values, which can both be easily serialized. We will convert back to IndexedRows on the - # Scala side. - javaIndexedRowMatrix = callMLlibFunc("createIndexedRowMatrix", rows.toDF(), - long(numRows), int(numCols)) - jirm = JavaModelWrapper(javaIndexedRowMatrix) - return IndexedRowMatrix(jirm) - - @staticmethod - def coordinateMatrix(entries, numRows=0, numCols=0): - """ - Create a CoordinateMatrix. - - :param entries: An RDD of MatrixEntry inputs or - (long, long, float) tuples. - """ - # We use DataFrames for serialization of MatrixEntry inputs from Python, so convert the RDD - # to a DataFrame. This will convert each MatrixEntry to a Row containing the 'i', 'j', and - # 'value' values, which can each be easily serialized. We will convert back to MatrixEntry - # inputs on the Scala side. - javaCoordinateMatrix = callMLlibFunc("createCoordinateMatrix", entries.toDF(), - long(numRows), long(numCols)) - jcm = JavaModelWrapper(javaCoordinateMatrix) - return CoordinateMatrix(jcm) - - -class RowMatrix(DistributedMatrix): - """ - Represents a row-oriented distributed Matrix with no meaningful - row indices. - - .. note:: Experimental - """ - def __init__(self, jrm): - """Create a wrapper over a Java RowMatrix.""" - self._jrm = jrm - self.rows = self._rows() - - def _rows(self): + @property + def rows(self): """ Get the rows of the RowMatrix as an RDD of Vectors. >>> rows = sc.parallelize([Vectors.dense([1, 2, 3]), Vectors.dense([4, 5, 6]), ... Vectors.dense([7, 8, 9]), Vectors.dense([10, 11, 12])]) - >>> rm = DistributedMatrices.rowMatrix(rows) + >>> rm = RowMatrix(rows) >>> rowsRDD = rm.rows >>> rowsRDD.first() DenseVector([1.0, 2.0, 3.0]) """ - return self._jrm.call("rows") + return self.call("rows") def numRows(self): """ @@ -1249,17 +1194,14 @@ def numRows(self): >>> rows = sc.parallelize([Vectors.dense([1, 2, 3]), Vectors.dense([4, 5, 6]), ... Vectors.dense([7, 8, 9]), Vectors.dense([10, 11, 12])]) - >>> rm = DistributedMatrices.rowMatrix(rows) + >>> rm = RowMatrix(rows) >>> int(rm.numRows()) 4 - - >>> rows = sc.parallelize([Vectors.dense([1, 2, 3]), Vectors.dense([4, 5, 6]), - ... Vectors.dense([7, 8, 9]), Vectors.dense([10, 11, 12])]) - >>> rm = DistributedMatrices.rowMatrix(rows, 7, 6) + >>> rm = RowMatrix(rows, 7, 6) >>> int(rm.numRows()) 7 """ - return self._jrm.call("numRows") + return self.call("numRows") def numCols(self): """ @@ -1267,26 +1209,43 @@ def numCols(self): >>> rows = sc.parallelize([Vectors.dense([1, 2, 3]), Vectors.dense([4, 5, 6]), ... Vectors.dense([7, 8, 9]), Vectors.dense([10, 11, 12])]) - >>> rm = DistributedMatrices.rowMatrix(rows) + >>> rm = RowMatrix(rows) >>> int(rm.numCols()) 3 - - >>> rows = sc.parallelize([Vectors.dense([1, 2, 3]), Vectors.dense([4, 5, 6]), - ... Vectors.dense([7, 8, 9]), Vectors.dense([10, 11, 12])]) - >>> rm = DistributedMatrices.rowMatrix(rows, 7, 6) + >>> rm = RowMatrix(rows, 7, 6) >>> int(rm.numCols()) 6 """ - return self._jrm.call("numCols") + return self.call("numCols") + + +@inherit_doc +class RowMatrix(RowMatrixModel): + """ + .. note:: Experimental + + Represents a row-oriented distributed Matrix with no meaningful + row indices. + + Create a RowMatrix. + + :param rows: An RDD of Vectors. + :param numRows: Number of rows in the RowMatrix, optional + :param numCols: Number of columns in the RowMatrix, optional + """ + def __init__(self, rows, numRows=0, numCols=0): + java_model = callMLlibFunc("createRowMatrix", rows, long(numRows), int(numCols)) + super(RowMatrix, self).__init__(java_model=java_model) class IndexedRow(object): """ + .. note:: Experimental + Represents a row of an IndexedRowMatrix. Just a wrapper over a (long, Vector) tuple. - .. note:: Experimental """ def __init__(self, index, vector): self.index = long(index) @@ -1296,37 +1255,13 @@ def __repr__(self): return "IndexedRow(%s, %s)" % (self.index, self.vector) -class IndexedRowMatrix(DistributedMatrix): - """ - Represents a row-oriented distributed Matrix with indexed rows. +@inherit_doc +class IndexedRowMatrixModel(DistributedMatrix, JavaModelWrapper): - .. note:: Experimental - """ - def __init__(self, jirm): - """Create a wrapper over a Java IndexedRowMatrix.""" - self._jirm = jirm + def __init__(self, java_model): + super(IndexedRowMatrixModel, self).__init__(java_model=java_model) self.rows = self._rows() - def _rows(self): - """ - Get the rows of the IndexedRowMatrix as an RDD of IndexedRows. - - >>> rows = sc.parallelize([IndexedRow(0, Vectors.dense([1, 2, 3])), - ... IndexedRow(1, Vectors.dense([4, 5, 6])), - ... IndexedRow(2, Vectors.dense([7, 8, 9])), - ... IndexedRow(3, Vectors.dense([10, 11, 12]))]) - >>> rm = DistributedMatrices.indexedRowMatrix(rows) - >>> rowsRDD = rm.rows - >>> rowsRDD.first() - IndexedRow(0, [1.0,2.0,3.0]) - """ - # We use DataFrames for serialization of IndexedRows between Python andJava, so we convert - # the RDD of rows to a DataFrame first on the Scala/Java side. Then we map each Row in the - # DataFrame back to an IndexedRow on this side. - rowsDF = callMLlibFunc("getIndexedRows", self._jirm._java_model) - rowsRDD = rowsDF.map(lambda row: IndexedRow(row[0], row[1])) - return rowsRDD - def numRows(self): """ Get or compute the number of rows. @@ -1335,19 +1270,14 @@ def numRows(self): ... IndexedRow(1, Vectors.dense([4, 5, 6])), ... IndexedRow(2, Vectors.dense([7, 8, 9])), ... IndexedRow(3, Vectors.dense([10, 11, 12]))]) - >>> rm = DistributedMatrices.indexedRowMatrix(rows) + >>> rm = IndexedRowMatrix(rows) >>> int(rm.numRows()) 4 - - >>> rows = sc.parallelize([IndexedRow(0, Vectors.dense([1, 2, 3])), - ... IndexedRow(1, Vectors.dense([4, 5, 6])), - ... IndexedRow(2, Vectors.dense([7, 8, 9])), - ... IndexedRow(3, Vectors.dense([10, 11, 12]))]) - >>> rm = DistributedMatrices.indexedRowMatrix(rows, 7, 6) + >>> rm = IndexedRowMatrix(rows, 7, 6) >>> int(rm.numRows()) 7 """ - return self._jirm.call("numRows") + return self.call("numRows") def numCols(self): """ @@ -1357,19 +1287,34 @@ def numCols(self): ... IndexedRow(1, Vectors.dense([4, 5, 6])), ... IndexedRow(2, Vectors.dense([7, 8, 9])), ... IndexedRow(3, Vectors.dense([10, 11, 12]))]) - >>> rm = DistributedMatrices.indexedRowMatrix(rows) + >>> rm = IndexedRowMatrix(rows) >>> int(rm.numCols()) 3 + >>> rm = IndexedRowMatrix(rows, 7, 6) + >>> int(rm.numCols()) + 6 + """ + return self.call("numCols") + + def _rows(self): + """ + Get the rows of the IndexedRowMatrix as an RDD of IndexedRows. >>> rows = sc.parallelize([IndexedRow(0, Vectors.dense([1, 2, 3])), ... IndexedRow(1, Vectors.dense([4, 5, 6])), ... IndexedRow(2, Vectors.dense([7, 8, 9])), ... IndexedRow(3, Vectors.dense([10, 11, 12]))]) - >>> rm = DistributedMatrices.indexedRowMatrix(rows, 7, 6) - >>> int(rm.numCols()) - 6 + >>> irm = IndexedRowMatrix(rows) + >>> rowsRDD = irm.rows + >>> rowsRDD.first() + IndexedRow(0, [1.0,2.0,3.0]) """ - return self._jirm.call("numCols") + # We use DataFrames for serialization of IndexedRows between Python andJava, so we convert + # the RDD of rows to a DataFrame first on the Scala/Java side. Then we map each Row in the + # DataFrame back to an IndexedRow on this side. + rowsDF = callMLlibFunc("getIndexedRows", self._java_model) + rowsRDD = rowsDF.map(lambda row: IndexedRow(row[0], row[1])) + return rowsRDD def toRowMatrix(self): """ @@ -1378,13 +1323,11 @@ def toRowMatrix(self): >>> # This IndexedRowMatrix will have 7 rows, due to the highest index being 6 >>> rows = sc.parallelize([IndexedRow(0, Vectors.dense([1, 2, 3])), ... IndexedRow(6, Vectors.dense([4, 5, 6]))]) - >>> rm = DistributedMatrices.indexedRowMatrix(rows).toRowMatrix() + >>> rm = IndexedRowMatrix(rows).toRowMatrix() >>> rm.rows.collect() [DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0])] """ - javaRowMatrix = self._jirm.call("toRowMatrix") - jrm = JavaModelWrapper(javaRowMatrix) - return RowMatrix(jrm) + return RowMatrixModel(self.call("toRowMatrix")) def toCoordinateMatrix(self): """ @@ -1393,22 +1336,41 @@ def toCoordinateMatrix(self): >>> # This IndexedRowMatrix will have 7 rows, due to the highest index being 6 >>> rows = sc.parallelize([IndexedRow(0, Vectors.dense([1, 0])), ... IndexedRow(6, Vectors.dense([0, 5]))]) - >>> cm = DistributedMatrices.indexedRowMatrix(rows).toCoordinateMatrix() + >>> cm = IndexedRowMatrix(rows).toCoordinateMatrix() >>> cm.entries.take(3) [MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 0.0), MatrixEntry(6, 0, 0.0)] """ - javaCoordinateMatrix = self._jirm.call("toCoordinateMatrix") - jcm = JavaModelWrapper(javaCoordinateMatrix) - return CoordinateMatrix(jcm) + return CoordinateMatrixModel(self.call("toCoordinateMatrix")) + + +@inherit_doc +class IndexedRowMatrix(IndexedRowMatrixModel): + """ + .. note:: Experimental + + Represents a row-oriented distributed Matrix with indexed rows. + + :param rows: An RDD of IndexedRows or (long, Vector) tuples. + :param numRows: Number of rows in the RowMatrix, optional + :param numCols: Number of columns in the RowMatrix, optional + """ + def __init__(self, rows, numRows=0, numCols=0): + # We use DataFrames for serialization of IndexedRows from Python, so convert the RDD to a + # DataFrame. This will convert each IndexedRow to a Row containing the 'index' and 'vector' + # values, which can both be easily serialized. We will convert back to IndexedRows on the + # Scala side. + java_model = callMLlibFunc( + "createIndexedRowMatrix", rows.toDF(), long(numRows), int(numCols)) + super(IndexedRowMatrix, self).__init__(java_model=java_model) class MatrixEntry(object): """ + .. note:: Experimental + Represents an entry of a CoordinateMatrix. Just a wrapper over a (long, long, float) tuple. - - .. note:: Experimental """ def __init__(self, i, j, value): self.i = long(i) @@ -1419,15 +1381,20 @@ def __repr__(self): return "MatrixEntry(%s, %s, %s)" % (self.i, self.j, self.value) -class CoordinateMatrix(object): +@inherit_doc +class CoordinateMatrixModel(DistributedMatrix, JavaModelWrapper): """ + .. note:: Experimental + Represents a matrix in coordinate format. - .. note:: Experimental + :param entries: An RDD of MatrixEntry inputs or + (long, long, float) tuples. + :param numRows: Number of rows in the RowMatrix, optional + :param numCols: Number of columns in the RowMatrix, optional """ - def __init__(self, jcm): - """Create a wrapper over a Java CoordinateMatrix.""" - self._jcm = jcm + def __init__(self, java_model): + super(CoordinateMatrixModel, self).__init__(java_model=java_model) self.entries = self._entries() def _entries(self): @@ -1438,7 +1405,7 @@ def _entries(self): >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(1, 0, 2.1), ... MatrixEntry(2, 1, 3.7)]) - >>> cm = DistributedMatrices.coordinateMatrix(entries) + >>> cm = CoordinateMatrix(entries) >>> entriesRDD = cm.entries >>> entriesRDD.first() MatrixEntry(0, 0, 1.2) @@ -1446,7 +1413,7 @@ def _entries(self): # We use DataFrames for serialization of MatrixEntry entries between Python and Java, so # we convert the RDD of entries to a DataFrame first on the Scala/Java side. Then we map # each Row in the DataFrame back to a MatrixEntry on this side. - entriesDF = callMLlibFunc("getMatrixEntries", self._jcm._java_model) + entriesDF = callMLlibFunc("getMatrixEntries", self._java_model) entriesRDD = entriesDF.map(lambda row: MatrixEntry(row[0], row[1], row[2])) return entriesRDD @@ -1457,18 +1424,14 @@ def numRows(self): >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(1, 0, 2.1), ... MatrixEntry(2, 1, 3.7)]) - >>> cm = DistributedMatrices.coordinateMatrix(entries) + >>> cm = CoordinateMatrix(entries) >>> int(cm.numRows()) 3 - - >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), - ... MatrixEntry(1, 0, 2.1), - ... MatrixEntry(2, 1, 3.7)]) - >>> cm = DistributedMatrices.coordinateMatrix(entries, 7, 6) + >>> cm = CoordinateMatrix(entries, 7, 6) >>> int(cm.numRows()) 7 """ - return self._jcm.call("numRows") + return self.call("numRows") def numCols(self): """ @@ -1477,18 +1440,14 @@ def numCols(self): >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), ... MatrixEntry(1, 0, 2.1), ... MatrixEntry(2, 1, 3.7)]) - >>> cm = DistributedMatrices.coordinateMatrix(entries) + >>> cm = CoordinateMatrix(entries) >>> int(cm.numCols()) 2 - - >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), - ... MatrixEntry(1, 0, 2.1), - ... MatrixEntry(2, 1, 3.7)]) - >>> cm = DistributedMatrices.coordinateMatrix(entries, 7, 6) + >>> cm = CoordinateMatrix(entries, 7, 6) >>> int(cm.numCols()) 6 """ - return self._jcm.call("numCols") + return self.call("numCols") def toRowMatrix(self): """ @@ -1498,20 +1457,16 @@ def toRowMatrix(self): >>> # being 6, but the ensuing RowMatrix will only have 2 rows since their are only >>> # entries on 2 unique rows. >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(6, 4, 2.1)]) - >>> rm = DistributedMatrices.coordinateMatrix(entries).toRowMatrix() + >>> rm = CoordinateMatrix(entries).toRowMatrix() >>> int(rm.numRows()) 2 - >>> # This CoordinateMatrix will have 5 columns, due to the highest column index being 4, + >>> # The CoordinateMatrix will have 5 columns, due to the highest column index being 4, >>> # and the ensuing RowMatrix will have 5 columns as well. - >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(6, 4, 2.1)]) - >>> rm = DistributedMatrices.coordinateMatrix(entries).toRowMatrix() >>> int(rm.numCols()) 5 """ - javaRowMatrix = self._jcm.call("toRowMatrix") - jrm = JavaModelWrapper(javaRowMatrix) - return RowMatrix(jrm) + return RowMatrixModel(self.call("toRowMatrix")) def toIndexedRowMatrix(self): """ @@ -1520,20 +1475,28 @@ def toIndexedRowMatrix(self): >>> # This CoordinateMatrix will have 7 effective rows, due to the highest row index >>> # being 6, and the ensuing IndexedRowMatrix will have 7 rows as well. >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(6, 4, 2.1)]) - >>> irm = DistributedMatrices.coordinateMatrix(entries).toIndexedRowMatrix() + >>> irm = CoordinateMatrix(entries).toIndexedRowMatrix() >>> int(irm.numRows()) 7 >>> # This CoordinateMatrix will have 5 columns, due to the highest column index being 4, >>> # and the ensuing IndexedRowMatrix will have 5 columns as well. - >>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(6, 4, 2.1)]) - >>> irm = DistributedMatrices.coordinateMatrix(entries).toIndexedRowMatrix() >>> int(irm.numCols()) 5 """ - javaIndexedRowMatrix = self._jcm.call("toIndexedRowMatrix") - jirm = JavaModelWrapper(javaIndexedRowMatrix) - return IndexedRowMatrix(jirm) + return IndexedRowMatrixModel(self.call("toIndexedRowMatrix")) + + +@inherit_doc +class CoordinateMatrix(CoordinateMatrixModel): + def __init__(self, entries, numRows=0, numCols=0): + # We use DataFrames for serialization of MatrixEntry inputs from Python, so convert the RDD + # to a DataFrame. This will convert each MatrixEntry to a Row containing the 'i', 'j', and + # 'value' values, which can each be easily serialized. We will convert back to MatrixEntry + # inputs on the Scala side. + java_model = callMLlibFunc( + "createCoordinateMatrix", entries.toDF(), long(numRows), long(numCols)) + super(CoordinateMatrix, self).__init__(java_model=java_model) def _test():