Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
d7e316f
Implemented the RowMatrix API in PySpark by doing the following: Adde…
dusenberrymw Jun 26, 2015
bc2d220
Adding unit tests for RowMatrix methods.
dusenberrymw Jun 26, 2015
b194623
Updating design to have a PySpark RowMatrix simply create and keep a …
dusenberrymw Jun 29, 2015
23bf1ec
Updating documentation to add PySpark RowMatrix. Inserting newline ab…
dusenberrymw Jun 29, 2015
4bdd09b
Implemented the IndexedRowMatrix API in PySpark, following the idea o…
dusenberrymw Jul 20, 2015
3c369cb
Updating the architecture a bit to make conversions between the vario…
dusenberrymw Jul 20, 2015
0cd7166
Implemented the CoordinateMatrix API in PySpark, following the idea o…
dusenberrymw Jul 20, 2015
dda2f89
Added wrappers for the conversions between the various distributed ma…
dusenberrymw Jul 21, 2015
ab0e8b6
Updating unit test to be more useful.
dusenberrymw Jul 21, 2015
f721ead
Updating documentation for each of the distributed matrices.
dusenberrymw Jul 21, 2015
7e3ca16
Fixing long lines.
dusenberrymw Jul 21, 2015
4d3a37e
Reformatting a few long Python doctest lines.
dusenberrymw Jul 21, 2015
a245dc0
Updating Python doctests for compatability between Python 2 & 3. Sinc…
dusenberrymw Jul 21, 2015
08f287b
Slight reformatting of the documentation.
dusenberrymw Jul 22, 2015
6a3ecb7
Updating pattern matching.
dusenberrymw Jul 23, 2015
f6f3c68
Pulling the DistributedMatrices Scala class out of this pull request.
dusenberrymw Jul 25, 2015
93b6a3d
Pulling the DistributedMatrices Python class out of this pull request.
dusenberrymw Jul 25, 2015
4d7af86
Adding type checks to the constructors. Although it is slightly verb…
dusenberrymw Jul 26, 2015
119018d
Adding static methods to each of the distributed matrix classes to c…
dusenberrymw Jul 26, 2015
10046e8
Updating documentation to use class constructors instead of the remov…
dusenberrymw Jul 26, 2015
3b854b9
Minor updates to documentation.
dusenberrymw Jul 27, 2015
4ad6819
Documenting the and parameters.
dusenberrymw Jul 27, 2015
c0900df
Adding the colons that were accidentally not inserted.
dusenberrymw Jul 27, 2015
0be6826
Simplifying doctests by removing duplicated rows/entries RDDs within …
dusenberrymw Jul 27, 2015
329638b
Moving the Experimental tag to the top of each docstring.
dusenberrymw Jul 27, 2015
c6bded5
Move conversion logic from tuples to IndexedRow or MatrixEntry types …
dusenberrymw Jul 27, 2015
4bd756d
Adding param documentation to IndexedRow and MatrixEntry.
dusenberrymw Jul 27, 2015
d19b0ba
Updating code and documentation to note that a vector-like object (nu…
dusenberrymw Jul 27, 2015
a409cf5
Updating doctests to be less verbose by using lists instead of DenseV…
dusenberrymw Jul 27, 2015
27cd5f6
Simplifying input conversions in the constructors for each distribute…
dusenberrymw Jul 29, 2015
3fd4016
Updating docstrings.
dusenberrymw Jul 29, 2015
ffdd724
Updating doctests to make documentation cleaner.
dusenberrymw Jul 29, 2015
f0c13a7
CoordinateMatrix should inherit from DistributedMatrix.
dusenberrymw Jul 29, 2015
1633f86
Minor documentation cleanup.
dusenberrymw Jul 29, 2015
308f197
Using properties for better documentation.
dusenberrymw Jul 30, 2015
3e50b6e
Moving the distributed matrices to pyspark.mllib.linalg.distributed.
dusenberrymw Jul 31, 2015
687e345
Improving conversion performance. This adds an optional 'java_matrix…
dusenberrymw Aug 1, 2015
cfc1be5
Use 'new SQLContext(matrix.rows.sparkContext)' rather than 'SQLContex…
dusenberrymw Aug 3, 2015
7f0dcb6
Updating module docstring.
dusenberrymw Aug 3, 2015
b887c18
Updating the matrix conversion logic again to make it even cleaner. …
dusenberrymw Aug 4, 2015
bb039cb
Minor documentation update.
dusenberrymw Aug 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ def contains_file(self, filename):
"pyspark.mllib.feature",
"pyspark.mllib.fpm",
"pyspark.mllib.linalg.__init__",
"pyspark.mllib.linalg.distributed",
"pyspark.mllib.random",
"pyspark.mllib.recommendation",
"pyspark.mllib.regression",
Expand Down
106 changes: 104 additions & 2 deletions docs/mllib-data-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,37 @@ long m = mat.numRows();
long n = mat.numCols();
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">

A [`RowMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix) can be
created from an `RDD` of vectors.

{% highlight python %}
from pyspark.mllib.linalg.distributed import RowMatrix

# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])

# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)

# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3

# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows
{% endhighlight %}
</div>

</div>

### IndexedRowMatrix

An `IndexedRowMatrix` is similar to a `RowMatrix` but with meaningful row indices. It is backed by
an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local vector.
an RDD of indexed rows, so that each row is represented by its index (long-typed) and a local
vector.

<div class="codetabs">
<div data-lang="scala" markdown="1">
Expand Down Expand Up @@ -431,7 +456,48 @@ long n = mat.numCols();
// Drop its row indices.
RowMatrix rowMat = mat.toRowMatrix();
{% endhighlight %}
</div></div>
</div>

<div data-lang="python" markdown="1">

An [`IndexedRowMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.IndexedRowMatrix)
can be created from an `RDD` of `IndexedRow`s, where
[`IndexedRow`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.IndexedRow) is a
wrapper over `(long, vector)`. An `IndexedRowMatrix` can be converted to a `RowMatrix` by dropping
its row indices.

{% highlight python %}
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# Create an RDD of indexed rows.
# - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
IndexedRow(1, [4, 5, 6]),
IndexedRow(2, [7, 8, 9]),
IndexedRow(3, [10, 11, 12])])
# - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
(2, [7, 8, 9]), (3, [10, 11, 12])])

# Create an IndexedRowMatrix from an RDD of IndexedRows.
mat = IndexedRowMatrix(indexedRows)

# Get its size.
m = mat.numRows() # 4
n = mat.numCols() # 3

# Get the rows as an RDD of IndexedRows.
rowsRDD = mat.rows

# Convert to a RowMatrix by dropping the row indices.
rowMat = mat.toRowMatrix()

# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()
{% endhighlight %}
</div>

</div>

### CoordinateMatrix

Expand Down Expand Up @@ -495,6 +561,42 @@ long n = mat.numCols();
IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">

A [`CoordinateMatrix`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.CoordinateMatrix)
can be created from an `RDD` of `MatrixEntry` entries, where
[`MatrixEntry`](api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.MatrixEntry) is a
wrapper over `(long, long, float)`. A `CoordinateMatrix` can be converted to a `RowMatrix` by
calling `toRowMatrix`, or to an `IndexedRowMatrix` with sparse rows by calling `toIndexedRowMatrix`.

{% highlight python %}
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# Create an RDD of coordinate entries.
# - This can be done explicitly with the MatrixEntry class:
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(6, 1, 3.7)])
# - or using (long, long, float) tuples:
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])

# Create an CoordinateMatrix from an RDD of MatrixEntries.
mat = CoordinateMatrix(entries)

# Get its size.
m = mat.numRows() # 3
n = mat.numCols() # 2

# Get the entries as an RDD of MatrixEntries.
entriesRDD = mat.entries

# Convert to a RowMatrix.
rowMat = mat.toRowMatrix()

# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()
{% endhighlight %}
</div>

</div>

### BlockMatrix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.mllib.feature._
import org.apache.spark.mllib.fpm.{FPGrowth, FPGrowthModel}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.random.{RandomRDDs => RG}
import org.apache.spark.mllib.recommendation._
Expand All @@ -54,7 +55,7 @@ import org.apache.spark.mllib.tree.{DecisionTree, GradientBoostedTrees, RandomFo
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.util.LinearDataGenerator
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -1096,6 +1097,56 @@ private[python] class PythonMLLibAPI extends Serializable {
Statistics.kolmogorovSmirnovTest(data, distName, paramsSeq: _*)
}

/**
* Wrapper around RowMatrix constructor.
*/
def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = {
new RowMatrix(rows.rdd, numRows, numCols)
}

/**
* Wrapper around IndexedRowMatrix constructor.
*/
def createIndexedRowMatrix(rows: DataFrame, numRows: Long, numCols: Int): IndexedRowMatrix = {
// We use DataFrames for serialization of IndexedRows from Python,
// so map each Row in the DataFrame back to an IndexedRow.
val indexedRows = rows.map {
case Row(index: Long, vector: Vector) => IndexedRow(index, vector)
}
new IndexedRowMatrix(indexedRows, numRows, numCols)
}

/**
* Wrapper around CoordinateMatrix constructor.
*/
def createCoordinateMatrix(rows: DataFrame, numRows: Long, numCols: Long): CoordinateMatrix = {
// We use DataFrames for serialization of MatrixEntry entries from
// Python, so map each Row in the DataFrame back to a MatrixEntry.
val entries = rows.map {
case Row(i: Long, j: Long, value: Double) => MatrixEntry(i, j, value)
}
new CoordinateMatrix(entries, numRows, numCols)
}

/**
* Return the rows of an IndexedRowMatrix.
*/
def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = {
// We use DataFrames for serialization of IndexedRows to Python,
// so return a DataFrame.
val sqlContext = new SQLContext(indexedRowMatrix.rows.sparkContext)
sqlContext.createDataFrame(indexedRowMatrix.rows)
}

/**
* Return the entries of a CoordinateMatrix.
*/
def getMatrixEntries(coordinateMatrix: CoordinateMatrix): DataFrame = {
// We use DataFrames for serialization of MatrixEntry entries to
// Python, so return a DataFrame.
val sqlContext = new SQLContext(coordinateMatrix.entries.sparkContext)
sqlContext.createDataFrame(coordinateMatrix.entries)
}
}

/**
Expand Down
8 changes: 8 additions & 0 deletions python/docs/pyspark.mllib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ pyspark.mllib.linalg module
:undoc-members:
:show-inheritance:

pyspark.mllib.linalg.distributed module
---------------------------------------

.. automodule:: pyspark.mllib.linalg.distributed
:members:
:undoc-members:
:show-inheritance:

pyspark.mllib.random module
---------------------------

Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/mllib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def _py2java(sc, obj):
""" Convert Python object into Java """
if isinstance(obj, RDD):
obj = _to_java_object_rdd(obj)
elif isinstance(obj, DataFrame):
obj = obj._jdf
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, list):
Expand Down
Loading