Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ private[python] class PythonMLLibAPI extends Serializable {
minPartitions: Int): JavaRDD[LabeledPoint] =
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)

/**
* Loads and serializes vectors saved with `RDD#saveAsTextFile`.
* @param jsc Java SparkContext
* @param path file or directory path in any Hadoop-supported file system URI
* @return serialized vectors in a RDD
*/
def loadVectors(jsc: JavaSparkContext,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala style: If this method declaration will fit on 1 line, then please do so. (I think it will.) Otherwise, the parameters should start in the line below the method name and be indented 4 spaces.

path: String): RDD[Vector] =
MLUtils.loadVectors(jsc.sc, path)

private def trainRegressionModel(
learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel],
data: JavaRDD[LabeledPoint],
Expand Down
27 changes: 27 additions & 0 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.util import MLUtils
from pyspark.serializers import PickleSerializer
from pyspark.sql import SQLContext

Expand Down Expand Up @@ -818,6 +819,32 @@ def test_model_transform(self):
self.assertEqual(model.transform([1.0, 2.0, 3.0]), DenseVector([1.0, 2.0, 3.0]))


class MLUtilsTests(MLlibTestCase):
def test_append_bias(self):
data = [2.0, 2.0, 2.0]
ret = MLUtils.appendBias(data)
self.assertEqual(ret[3], 1.0)

def test_load_vectors(self):
import shutil
data = [
[1.0, 2.0, 3.0],
[1.0, 2.0, 3.0]
]
try:
temp_dir = tempfile.mkdtemp()
load_vectors_path = os.path.join(temp_dir, "test_load_vectors")
self.sc.parallelize(data).saveAsTextFile(load_vectors_path)
ret_rdd = MLUtils.loadVectors(self.sc, load_vectors_path)
ret = ret_rdd.collect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Order of collect() is not guaranteed, so please sort "ret" and "data" and then compare to make the test robust.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I guess this didn't matter; I didn't notice the vectors were identical. Fine to keep it sorted though.

ret.sort()
self.assertEqual(len(ret), 2)
self.assertEqual(ret[0], DenseVector([1.0, 2.0, 3.0]))
self.assertEqual(ret[1], DenseVector([1.0, 2.0, 3.0]))
finally:
shutil.rmtree(load_vectors_path)


if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,22 @@ def loadLabeledPoints(sc, path, minPartitions=None):
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)

@staticmethod
def appendBias(data):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the Scala version only operates on individual vectors, this one should not be a wrapper; it should do everything in Python. The reason is that callMLlibFunc requires the SparkContext and needs to operate on the driver. But since appendBias operates per-Row, it needs to be called on workers.

Also, please add doc. Feel free to copy from Scala doc.

"""
Returns a new vector with `1.0` (bias) appended to the input vector.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we're at it, could you please make this more specific: "appended to the end of the input vector"
And could you please add it to the Scala version doc too?

"""
vec = _convert_to_vector(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this work if "data" is a Vector type? _convert_to_vector will leave it as a Vector type.

return np.append(vec, 1.0)

@staticmethod
def loadVectors(sc, path):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add doc. Feel free to copy from Scala doc

"""
Loads vectors saved using `RDD[Vector].saveAsTextFile`
with the default number of partitions.
"""
return callMLlibFunc("loadVectors", sc, path)


class Saveable(object):
"""
Expand Down