Skip to content

Commit df8242c

Browse files
Davies Liumengxr
authored andcommitted
[SPARK-4324] [PySpark] [MLlib] support numpy.array for all MLlib API
This PR check all of the existing Python MLlib API to make sure that numpy.array is supported as Vector (also RDD of numpy.array). It also improve some docstring and doctest. cc mateiz mengxr Author: Davies Liu <[email protected]> Closes #3189 from davies/numpy and squashes the following commits: d5057c4 [Davies Liu] fix tests 6987611 [Davies Liu] support numpy.array for all MLlib API (cherry picked from commit 65083e9) Signed-off-by: Xiangrui Meng <[email protected]>
1 parent 4eeaf33 commit df8242c

File tree

7 files changed

+105
-32
lines changed

7 files changed

+105
-32
lines changed

python/pyspark/mllib/classification.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class LogisticRegressionModel(LinearModel):
6262
"""
6363

6464
def predict(self, x):
65+
x = _convert_to_vector(x)
6566
margin = self.weights.dot(x) + self._intercept
6667
if margin > 0:
6768
prob = 1 / (1 + exp(-margin))
@@ -79,7 +80,7 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
7980
"""
8081
Train a logistic regression model on the given data.
8182
82-
:param data: The training data.
83+
:param data: The training data, an RDD of LabeledPoint.
8384
:param iterations: The number of iterations (default: 100).
8485
:param step: The step parameter used in SGD
8586
(default: 1.0).
@@ -136,6 +137,7 @@ class SVMModel(LinearModel):
136137
"""
137138

138139
def predict(self, x):
140+
x = _convert_to_vector(x)
139141
margin = self.weights.dot(x) + self.intercept
140142
return 1 if margin >= 0 else 0
141143

@@ -148,7 +150,7 @@ def train(cls, data, iterations=100, step=1.0, regParam=1.0,
148150
"""
149151
Train a support vector machine on the given data.
150152
151-
:param data: The training data.
153+
:param data: The training data, an RDD of LabeledPoint.
152154
:param iterations: The number of iterations (default: 100).
153155
:param step: The step parameter used in SGD
154156
(default: 1.0).
@@ -233,11 +235,12 @@ def train(cls, data, lambda_=1.0):
233235
classification. By making every vector a 0-1 vector, it can also be
234236
used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
235237
236-
:param data: RDD of NumPy vectors, one per element, where the first
237-
coordinate is the label and the rest is the feature vector
238-
(e.g. a count vector).
238+
:param data: RDD of LabeledPoint.
239239
:param lambda_: The smoothing parameter
240240
"""
241+
first = data.first()
242+
if not isinstance(first, LabeledPoint):
243+
raise ValueError("`data` should be an RDD of LabeledPoint")
241244
labels, pi, theta = callMLlibFunc("trainNaiveBayes", data, lambda_)
242245
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
243246

python/pyspark/mllib/feature.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
from pyspark import RDD, SparkContext
2727
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
28-
from pyspark.mllib.linalg import Vectors
28+
from pyspark.mllib.linalg import Vectors, _convert_to_vector
2929

3030
__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
3131
'HashingTF', 'IDFModel', 'IDF', 'Word2Vec', 'Word2VecModel']
@@ -81,12 +81,16 @@ def transform(self, vector):
8181
"""
8282
Applies unit length normalization on a vector.
8383
84-
:param vector: vector to be normalized.
84+
:param vector: vector or RDD of vector to be normalized.
8585
:return: normalized vector. If the norm of the input is zero, it
8686
will return the input vector.
8787
"""
8888
sc = SparkContext._active_spark_context
8989
assert sc is not None, "SparkContext should be initialized first"
90+
if isinstance(vector, RDD):
91+
vector = vector.map(_convert_to_vector)
92+
else:
93+
vector = _convert_to_vector(vector)
9094
return callMLlibFunc("normalizeVector", self.p, vector)
9195

9296

@@ -95,8 +99,12 @@ class JavaVectorTransformer(JavaModelWrapper, VectorTransformer):
9599
Wrapper for the model in JVM
96100
"""
97101

98-
def transform(self, dataset):
99-
return self.call("transform", dataset)
102+
def transform(self, vector):
103+
if isinstance(vector, RDD):
104+
vector = vector.map(_convert_to_vector)
105+
else:
106+
vector = _convert_to_vector(vector)
107+
return self.call("transform", vector)
100108

101109

102110
class StandardScalerModel(JavaVectorTransformer):
@@ -109,7 +117,7 @@ def transform(self, vector):
109117
"""
110118
Applies standardization transformation on a vector.
111119
112-
:param vector: Vector to be standardized.
120+
:param vector: Vector or RDD of Vector to be standardized.
113121
:return: Standardized vector. If the variance of a column is zero,
114122
it will return default `0.0` for the column with zero variance.
115123
"""
@@ -154,6 +162,7 @@ def fit(self, dataset):
154162
the transformation model.
155163
:return: a StandardScalarModel
156164
"""
165+
dataset = dataset.map(_convert_to_vector)
157166
jmodel = callMLlibFunc("fitStandardScaler", self.withMean, self.withStd, dataset)
158167
return StandardScalerModel(jmodel)
159168

@@ -211,6 +220,8 @@ def transform(self, dataset):
211220
:param dataset: an RDD of term frequency vectors
212221
:return: an RDD of TF-IDF vectors
213222
"""
223+
if not isinstance(dataset, RDD):
224+
raise TypeError("dataset should be an RDD of term frequency vectors")
214225
return JavaVectorTransformer.transform(self, dataset)
215226

216227

@@ -255,7 +266,9 @@ def fit(self, dataset):
255266
256267
:param dataset: an RDD of term frequency vectors
257268
"""
258-
jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset)
269+
if not isinstance(dataset, RDD):
270+
raise TypeError("dataset should be an RDD of term frequency vectors")
271+
jmodel = callMLlibFunc("fitIDF", self.minDocFreq, dataset.map(_convert_to_vector))
259272
return IDFModel(jmodel)
260273

261274

@@ -287,6 +300,8 @@ def findSynonyms(self, word, num):
287300
288301
Note: local use only
289302
"""
303+
if not isinstance(word, basestring):
304+
word = _convert_to_vector(word)
290305
words, similarity = self.call("findSynonyms", word, num)
291306
return zip(words, similarity)
292307

@@ -374,9 +389,11 @@ def fit(self, data):
374389
"""
375390
Computes the vector representation of each word in vocabulary.
376391
377-
:param data: training data. RDD of subtype of Iterable[String]
392+
:param data: training data. RDD of list of string
378393
:return: Word2VecModel instance
379394
"""
395+
if not isinstance(data, RDD):
396+
raise TypeError("data should be an RDD of list of string")
380397
jmodel = callMLlibFunc("trainWord2Vec", data, int(self.vectorSize),
381398
float(self.learningRate), int(self.numPartitions),
382399
int(self.numIterations), long(self.seed))

python/pyspark/mllib/random.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ def uniformRDD(sc, size, numPartitions=None, seed=None):
5252
C{RandomRDDs.uniformRDD(sc, n, p, seed)\
5353
.map(lambda v: a + (b - a) * v)}
5454
55+
:param sc: SparkContext used to create the RDD.
56+
:param size: Size of the RDD.
57+
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
58+
:param seed: Random seed (default: a random long integer).
59+
:return: RDD of float comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
60+
5561
>>> x = RandomRDDs.uniformRDD(sc, 100).collect()
5662
>>> len(x)
5763
100
@@ -76,6 +82,12 @@ def normalRDD(sc, size, numPartitions=None, seed=None):
7682
C{RandomRDDs.normal(sc, n, p, seed)\
7783
.map(lambda v: mean + sigma * v)}
7884
85+
:param sc: SparkContext used to create the RDD.
86+
:param size: Size of the RDD.
87+
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
88+
:param seed: Random seed (default: a random long integer).
89+
:return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
90+
7991
>>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L)
8092
>>> stats = x.stats()
8193
>>> stats.count()
@@ -93,6 +105,13 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
93105
Generates an RDD comprised of i.i.d. samples from the Poisson
94106
distribution with the input mean.
95107
108+
:param sc: SparkContext used to create the RDD.
109+
:param mean: Mean, or lambda, for the Poisson distribution.
110+
:param size: Size of the RDD.
111+
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
112+
:param seed: Random seed (default: a random long integer).
113+
:return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
114+
96115
>>> mean = 100.0
97116
>>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L)
98117
>>> stats = x.stats()
@@ -104,7 +123,7 @@ def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
104123
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
105124
True
106125
"""
107-
return callMLlibFunc("poissonRDD", sc._jsc, mean, size, numPartitions, seed)
126+
return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed)
108127

109128
@staticmethod
110129
@toArray
@@ -113,6 +132,13 @@ def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
113132
Generates an RDD comprised of vectors containing i.i.d. samples drawn
114133
from the uniform distribution U(0.0, 1.0).
115134
135+
:param sc: SparkContext used to create the RDD.
136+
:param numRows: Number of Vectors in the RDD.
137+
:param numCols: Number of elements in each Vector.
138+
:param numPartitions: Number of partitions in the RDD.
139+
:param seed: Seed for the RNG that generates the seed for the generator in each partition.
140+
:return: RDD of Vector with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
141+
116142
>>> import numpy as np
117143
>>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
118144
>>> mat.shape
@@ -131,6 +157,13 @@ def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
131157
Generates an RDD comprised of vectors containing i.i.d. samples drawn
132158
from the standard normal distribution.
133159
160+
:param sc: SparkContext used to create the RDD.
161+
:param numRows: Number of Vectors in the RDD.
162+
:param numCols: Number of elements in each Vector.
163+
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
164+
:param seed: Random seed (default: a random long integer).
165+
:return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
166+
134167
>>> import numpy as np
135168
>>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect())
136169
>>> mat.shape
@@ -149,6 +182,14 @@ def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
149182
Generates an RDD comprised of vectors containing i.i.d. samples drawn
150183
from the Poisson distribution with the input mean.
151184
185+
:param sc: SparkContext used to create the RDD.
186+
:param mean: Mean, or lambda, for the Poisson distribution.
187+
:param numRows: Number of Vectors in the RDD.
188+
:param numCols: Number of elements in each Vector.
189+
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
190+
:param seed: Random seed (default: a random long integer).
191+
:return: RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean).
192+
152193
>>> import numpy as np
153194
>>> mean = 100.0
154195
>>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L)
@@ -161,7 +202,7 @@ def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
161202
>>> abs(mat.std() - sqrt(mean)) < 0.5
162203
True
163204
"""
164-
return callMLlibFunc("poissonVectorRDD", sc._jsc, mean, numRows, numCols,
205+
return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols,
165206
numPartitions, seed)
166207

167208

python/pyspark/mllib/recommendation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __reduce__(self):
3232
return Rating, (self.user, self.product, self.rating)
3333

3434
def __repr__(self):
35-
return "Rating(%d, %d, %d)" % (self.user, self.product, self.rating)
35+
return "Rating(%d, %d, %s)" % (self.user, self.product, self.rating)
3636

3737

3838
class MatrixFactorizationModel(JavaModelWrapper):
@@ -51,7 +51,7 @@ class MatrixFactorizationModel(JavaModelWrapper):
5151
>>> testset = sc.parallelize([(1, 2), (1, 1)])
5252
>>> model = ALS.train(ratings, 1, seed=10)
5353
>>> model.predictAll(testset).collect()
54-
[Rating(1, 1, 1), Rating(1, 2, 1)]
54+
[Rating(1, 1, 1.0471...), Rating(1, 2, 1.9679...)]
5555
5656
>>> model = ALS.train(ratings, 4, seed=10)
5757
>>> model.userFeatures().collect()
@@ -79,7 +79,7 @@ class MatrixFactorizationModel(JavaModelWrapper):
7979
0.4473...
8080
"""
8181
def predict(self, user, product):
82-
return self._java_model.predict(user, product)
82+
return self._java_model.predict(int(user), int(product))
8383

8484
def predictAll(self, user_product):
8585
assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"

python/pyspark/mllib/regression.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class LabeledPoint(object):
3636
"""
3737

3838
def __init__(self, label, features):
39-
self.label = label
39+
self.label = float(label)
4040
self.features = _convert_to_vector(features)
4141

4242
def __reduce__(self):
@@ -46,7 +46,7 @@ def __str__(self):
4646
return "(" + ",".join((str(self.label), str(self.features))) + ")"
4747

4848
def __repr__(self):
49-
return "LabeledPoint(" + ",".join((repr(self.label), repr(self.features))) + ")"
49+
return "LabeledPoint(%s, %s)" % (self.label, self.features)
5050

5151

5252
class LinearModel(object):
@@ -55,7 +55,7 @@ class LinearModel(object):
5555

5656
def __init__(self, weights, intercept):
5757
self._coeff = _convert_to_vector(weights)
58-
self._intercept = intercept
58+
self._intercept = float(intercept)
5959

6060
@property
6161
def weights(self):
@@ -66,7 +66,7 @@ def intercept(self):
6666
return self._intercept
6767

6868
def __repr__(self):
69-
return "(weights=%s, intercept=%s)" % (self._coeff, self._intercept)
69+
return "(weights=%s, intercept=%r)" % (self._coeff, self._intercept)
7070

7171

7272
class LinearRegressionModelBase(LinearModel):
@@ -85,6 +85,7 @@ def predict(self, x):
8585
Predict the value of the dependent variable given a vector x
8686
containing values for the independent variables.
8787
"""
88+
x = _convert_to_vector(x)
8889
return self.weights.dot(x) + self.intercept
8990

9091

@@ -124,6 +125,9 @@ class LinearRegressionModel(LinearRegressionModelBase):
124125
# return the result of a call to the appropriate JVM stub.
125126
# _regression_train_wrapper is responsible for setup and error checking.
126127
def _regression_train_wrapper(train_func, modelClass, data, initial_weights):
128+
first = data.first()
129+
if not isinstance(first, LabeledPoint):
130+
raise ValueError("data should be an RDD of LabeledPoint, but got %s" % first)
127131
initial_weights = initial_weights or [0.0] * len(data.first().features)
128132
weights, intercept = train_func(_to_java_object_rdd(data, cache=True),
129133
_convert_to_vector(initial_weights))
@@ -264,7 +268,8 @@ def train(rdd, i):
264268
def _test():
265269
import doctest
266270
from pyspark import SparkContext
267-
globs = globals().copy()
271+
import pyspark.mllib.regression
272+
globs = pyspark.mllib.regression.__dict__.copy()
268273
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
269274
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
270275
globs['sc'].stop()

python/pyspark/mllib/stat.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from pyspark import RDD
2323
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
2424
from pyspark.mllib.linalg import Matrix, _convert_to_vector
25+
from pyspark.mllib.regression import LabeledPoint
2526

2627

2728
__all__ = ['MultivariateStatisticalSummary', 'ChiSqTestResult', 'Statistics']
@@ -107,6 +108,11 @@ def colStats(rdd):
107108
"""
108109
Computes column-wise summary statistics for the input RDD[Vector].
109110
111+
:param rdd: an RDD[Vector] for which column-wise summary statistics
112+
are to be computed.
113+
:return: :class:`MultivariateStatisticalSummary` object containing
114+
column-wise summary statistics.
115+
110116
>>> from pyspark.mllib.linalg import Vectors
111117
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
112118
... Vectors.dense([4, 5, 0, 3]),
@@ -140,6 +146,13 @@ def corr(x, y=None, method=None):
140146
to specify the method to be used for single RDD inout.
141147
If two RDDs of floats are passed in, a single float is returned.
142148
149+
:param x: an RDD of vector for which the correlation matrix is to be computed,
150+
or an RDD of float of the same cardinality as y when y is specified.
151+
:param y: an RDD of float of the same cardinality as x.
152+
:param method: String specifying the method to use for computing correlation.
153+
Supported: `pearson` (default), `spearman`
154+
:return: Correlation matrix comparing columns in x.
155+
143156
>>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
144157
>>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
145158
>>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
@@ -242,7 +255,6 @@ def chiSqTest(observed, expected=None):
242255
>>> print round(chi.statistic, 4)
243256
21.9958
244257
245-
>>> from pyspark.mllib.regression import LabeledPoint
246258
>>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
247259
... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
248260
... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
@@ -257,6 +269,8 @@ def chiSqTest(observed, expected=None):
257269
1.5
258270
"""
259271
if isinstance(observed, RDD):
272+
if not isinstance(observed.first(), LabeledPoint):
273+
raise ValueError("observed should be an RDD of LabeledPoint")
260274
jmodels = callMLlibFunc("chiSqTest", observed)
261275
return [ChiSqTestResult(m) for m in jmodels]
262276

0 commit comments

Comments
 (0)