Skip to content
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ private[python] class PythonMLLibAPI extends Serializable {
minPartitions: Int): JavaRDD[LabeledPoint] =
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)

/**
* Loads and serializes vectors saved with `RDD#saveAsTextFile`.
* @param jsc Java SparkContext
* @param path file or directory path in any Hadoop-supported file system URI
* @return serialized vectors in a RDD
*/
def loadVectors(jsc: JavaSparkContext, path: String): RDD[Vector] =
MLUtils.loadVectors(jsc.sc, path)

private def trainRegressionModel(
learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel],
data: JavaRDD[LabeledPoint],
Expand Down
44 changes: 43 additions & 1 deletion python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import tempfile
import array as pyarray
from time import time, sleep

from numpy import array, array_equal, zeros, inf, all, random
from numpy import sum as array_sum
from py4j.protocol import Py4JJavaError
Expand All @@ -50,6 +49,7 @@
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.mllib.util import MLUtils
from pyspark.serializers import PickleSerializer
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
Expand Down Expand Up @@ -1011,6 +1011,48 @@ def collect(rdd):
self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])


class MLUtilsTests(MLlibTestCase):
def test_append_bias(self):
data = [2.0, 2.0, 2.0]
ret = MLUtils.appendBias(data)
self.assertEqual(ret[3], 1.0)
self.assertEqual(type(ret), DenseVector)

def test_append_bias_with_vector(self):
data = Vectors.dense([2.0, 2.0, 2.0])
ret = MLUtils.appendBias(data)
self.assertEqual(ret[3], 1.0)
self.assertEqual(type(ret), DenseVector)

def test_append_bias_with_sp_vector(self):
data = Vectors.sparse(3, {0: 2.0, 2: 2.0})
expected = Vectors.sparse(4, {0: 2.0, 2: 2.0, 3: 1.0})
# Returned value must be SparseVector
ret = MLUtils.appendBias(data)
self.assertEqual(ret, expected)
self.assertEqual(type(ret), SparseVector)

def test_load_vectors(self):
import shutil
data = [
[1.0, 2.0, 3.0],
[1.0, 2.0, 3.0]
]
temp_dir = tempfile.mkdtemp()
load_vectors_path = os.path.join(temp_dir, "test_load_vectors")
try:
self.sc.parallelize(data).saveAsTextFile(load_vectors_path)
ret_rdd = MLUtils.loadVectors(self.sc, load_vectors_path)
ret = ret_rdd.collect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Order of collect() is not guaranteed, so please sort "ret" and "data" and then compare to make the test robust.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I guess this didn't matter; I didn't notice the vectors were identical. Fine to keep it sorted though.

self.assertEqual(len(ret), 2)
self.assertEqual(ret[0], DenseVector([1.0, 2.0, 3.0]))
self.assertEqual(ret[1], DenseVector([1.0, 2.0, 3.0]))
except:
self.fail()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to confirm the existence of vectors exported with saveAsTextFile, it should be removed on each time this test runs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misunderstood; please ignore my comment.

finally:
shutil.rmtree(load_vectors_path)


if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
Expand Down
30 changes: 29 additions & 1 deletion python/pyspark/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
import sys
import numpy as np
import warnings
try:
import scipy.sparse
_have_scipy = True
except:
# No SciPy in environment, but that's okay
_have_scipy = False

if sys.version > '3':
xrange = range

from pyspark.mllib.common import callMLlibFunc, inherit_doc
from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
from pyspark.mllib.linalg import Vector, Vectors, SparseVector, _convert_to_vector


class MLUtils(object):
Expand Down Expand Up @@ -169,6 +175,28 @@ def loadLabeledPoints(sc, path, minPartitions=None):
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)

@staticmethod
def appendBias(data):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the Scala version only operates on individual vectors, this one should not be a wrapper; it should do everything in Python. The reason is that callMLlibFunc requires the SparkContext and needs to operate on the driver. But since appendBias operates per-Row, it needs to be called on workers.

Also, please add doc. Feel free to copy from Scala doc.

"""
Returns a new vector with `1.0` (bias) appended to
the end of the input vector.
"""
vec = _convert_to_vector(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this work if "data" is a Vector type? _convert_to_vector will leave it as a Vector type.

if isinstance(vec, SparseVector):
l = scipy.sparse.csc_matrix(np.append(vec.toArray(), 1.0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You won't be able to use scipy here since the user might not have it available (but I also don't think there's a need to). It should be straightforward to get the size, indices, and values, modify them to append a 1.0, and create a new SparseVector.

return _convert_to_vector(l.T)
elif isinstance(vec, Vector):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to else, and indent the return statement to make this easier to read.

vec = vec.toArray()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to make sparse vectors become dense. It should treat SparseVector and scipy.sparse column matrices specially so data size does not blow up. There are probably numpy or scipy methods which could help with that.

return _convert_to_vector(np.append(vec, 1.0).tolist())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have to convert to a list?


@staticmethod
def loadVectors(sc, path):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add doc. Feel free to copy from Scala doc

"""
Loads vectors saved using `RDD[Vector].saveAsTextFile`
with the default number of partitions.
"""
return callMLlibFunc("loadVectors", sc, path)


class Saveable(object):
"""
Expand Down