@@ -3,6 +3,9 @@ layout: global
33title : Machine Learning Library (MLlib)
44---
55
6+ * Table of contents
7+ {: toc }
8+
69MLlib is a Spark implementation of some common machine learning (ML)
710functionality, as well associated tests and data generators. MLlib
811currently supports four common types of machine learning problem settings,
@@ -39,57 +42,6 @@ underlying gradient descent primitive (described
3942parameter (* regParam* ) along with various parameters associated with gradient
4043descent (* stepSize* , * numIterations* , * miniBatchFraction* ).
4144
42- The following code snippet illustrates how to load a sample dataset, execute a
43- training algorithm on this training data using a static method in the algorithm
44- object, and make predictions with the resulting model to compute the training
45- error.
46-
47- {% highlight scala %}
48- import org.apache.spark.SparkContext
49- import org.apache.spark.mllib.classification.SVMWithSGD
50- import org.apache.spark.mllib.regression.LabeledPoint
51-
52- // Load and parse the data file
53- val data = sc.textFile("mllib/data/sample_svm_data.txt")
54- val parsedData = data.map { line =>
55- val parts = line.split(' ')
56- LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)
57- }
58-
59- // Run training algorithm
60- val numIterations = 20
61- val model = SVMWithSGD.train(parsedData, numIterations)
62-
63- // Evaluate model on training examples and compute training error
64- val labelAndPreds = parsedData.map { point =>
65- val prediction = model.predict(point.features)
66- (point.label, prediction)
67- }
68- val trainErr = labelAndPreds.filter(r => r._ 1 != r._ 2).count.toDouble / parsedData.count
69- println("trainError = " + trainErr)
70- {% endhighlight %}
71-
72- The ` SVMWithSGD.train() ` method by default performs L2 regularization with the
73- regularization parameter set to 1.0. If we want to configure this algorithm, we
74- can customize ` SVMWithSGD ` further by creating a new object directly and
75- calling setter methods. All other MLlib algorithms support customization in
76- this way as well. For example, the following code produces an L1 regularized
77- variant of SVMs with regularization parameter set to 0.1, and runs the training
78- algorithm for 200 iterations.
79-
80- {% highlight scala %}
81- import org.apache.spark.mllib.optimization.L1Updater
82-
83- val svmAlg = new SVMWithSGD()
84- svmAlg.optimizer.setNumIterations(200)
85- .setRegParam(0.1)
86- .setUpdater(new L1Updater)
87- val modelL1 = svmAlg.run(parsedData)
88- {% endhighlight %}
89-
90- Both of the code snippets above can be executed in ` bin/spark-shell ` to generate a
91- classifier for the provided dataset.
92-
9345Available algorithms for binary classification:
9446
9547* [ SVMWithSGD] ( api/mllib/index.html#org.apache.spark.mllib.classification.SVMWithSGD )
@@ -121,14 +73,14 @@ of entities with one another based on some notion of similarity. Clustering is
12173often used for exploratory analysis and/or as a component of a hierarchical
12274supervised learning pipeline (in which distinct classifiers or regression
12375models are trained for each cluster). MLlib supports
124- [ k-means] ( http://en.wikipedia.org/wiki/K-means_clustering ) clustering, arguably
125- the most commonly used clustering approach that clusters the data points into
126- * k * clusters. The MLlib implementation includes a parallelized
76+ [ k-means] ( http://en.wikipedia.org/wiki/K-means_clustering ) clustering, one of
77+ the most commonly used clustering algorithms that clusters the data points into
78+ predfined number of clusters. The MLlib implementation includes a parallelized
12779variant of the [ k-means++] ( http://en.wikipedia.org/wiki/K-means%2B%2B ) method
12880called [ kmeans||] ( http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf ) .
12981The implementation in MLlib has the following parameters:
13082
131- * * k* is the number of clusters.
83+ * * k* is the number of desired clusters.
13284* * maxIterations* is the maximum number of iterations to run.
13385* * initializationMode* specifies either random initialization or
13486initialization via k-means\|\| .
@@ -169,7 +121,7 @@ the entries in the user-item matrix as *explicit* preferences given by the user
169121It is common in many real-world use cases to only have access to * implicit feedback*
170122(e.g. views, clicks, purchases, likes, shares etc.). The approach used in MLlib to deal with
171123such data is taken from
172- [ Collaborative Filtering for Implicit Feedback Datasets] ( http://research.yahoo .com/pub/2433 ) .
124+ [ Collaborative Filtering for Implicit Feedback Datasets] ( http://www2. research.att .com/~yifanhu/PUB/cf.pdf ) .
173125Essentially instead of trying to model the matrix of ratings directly, this approach treats the data as
174126a combination of binary preferences and * confidence values* . The ratings are then related
175127to the level of confidence in observed user preferences, rather than explicit ratings given to items.
@@ -210,3 +162,269 @@ at each iteration.
210162Available algorithms for gradient descent:
211163
212164* [ GradientDescent] ( api/mllib/index.html#org.apache.spark.mllib.optimization.GradientDescent )
165+
166+ # Using MLLib in Scala
167+
168+ Following code snippets can be executed in ` spark-shell ` .
169+
170+ ## Binary Classification
171+
172+ The following code snippet illustrates how to load a sample dataset, execute a
173+ training algorithm on this training data using a static method in the algorithm
174+ object, and make predictions with the resulting model to compute the training
175+ error.
176+
177+ {% highlight scala %}
178+ import org.apache.spark.SparkContext
179+ import org.apache.spark.mllib.classification.SVMWithSGD
180+ import org.apache.spark.mllib.regression.LabeledPoint
181+
182+ // Load and parse the data file
183+ val data = sc.textFile("mllib/data/sample_svm_data.txt")
184+ val parsedData = data.map { line =>
185+ val parts = line.split(' ')
186+ LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)
187+ }
188+
189+ // Run training algorithm to build the model
190+ val numIterations = 20
191+ val model = SVMWithSGD.train(parsedData, numIterations)
192+
193+ // Evaluate model on training examples and compute training error
194+ val labelAndPreds = parsedData.map { point =>
195+ val prediction = model.predict(point.features)
196+ (point.label, prediction)
197+ }
198+ val trainErr = labelAndPreds.filter(r => r._ 1 != r._ 2).count.toDouble / parsedData.count
199+ println("Training Error = " + trainErr)
200+ {% endhighlight %}
201+
202+
203+ The ` SVMWithSGD.train() ` method by default performs L2 regularization with the
204+ regularization parameter set to 1.0. If we want to configure this algorithm, we
205+ can customize ` SVMWithSGD ` further by creating a new object directly and
206+ calling setter methods. All other MLlib algorithms support customization in
207+ this way as well. For example, the following code produces an L1 regularized
208+ variant of SVMs with regularization parameter set to 0.1, and runs the training
209+ algorithm for 200 iterations.
210+
211+ {% highlight scala %}
212+ import org.apache.spark.mllib.optimization.L1Updater
213+
214+ val svmAlg = new SVMWithSGD()
215+ svmAlg.optimizer.setNumIterations(200)
216+ .setRegParam(0.1)
217+ .setUpdater(new L1Updater)
218+ val modelL1 = svmAlg.run(parsedData)
219+ {% endhighlight %}
220+
221+ ## Linear Regression
222+ The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The
223+ example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We
224+ compute the Mean Squared Error at the end to evaluate
225+ [ goodness of fit] ( http://en.wikipedia.org/wiki/Goodness_of_fit )
226+
227+ {% highlight scala %}
228+ import org.apache.spark.mllib.regression.LinearRegressionWithSGD
229+ import org.apache.spark.mllib.regression.LabeledPoint
230+
231+ // Load and parse the data
232+ val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
233+ val parsedData = data.map { line =>
234+ val parts = line.split(',')
235+ LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)
236+ }
237+
238+ // Building the model
239+ val numIterations = 20
240+ val model = LinearRegressionWithSGD.train(parsedData, numIterations)
241+
242+ // Evaluate model on training examples and compute training error
243+ val valuesAndPreds = parsedData.map { point =>
244+ val prediction = model.predict(point.features)
245+ (point.label, prediction)
246+ }
247+ val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _ )/valuesAndPreds.count
248+ println("training Mean Squared Error = " + MSE)
249+ {% endhighlight %}
250+
251+
252+ Similarly you can use RidgeRegressionWithSGD and LassoWithSGD and compare training
253+ [ Mean Squared Errors] ( http://en.wikipedia.org/wiki/Mean_squared_error ) .
254+
255+ ## Clustering
256+ In the following example after loading and parsing data, we use the KMeans object to cluster the data
257+ into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within
258+ Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasing * k* . In fact the
259+ optimal * k* is usually one where there is an "elbow" in the WSSSE graph.
260+
261+ {% highlight scala %}
262+ import org.apache.spark.mllib.clustering.KMeans
263+
264+ // Load and parse the data
265+ val data = sc.textFile("kmeans_data.txt")
266+ val parsedData = data.map( _ .split(' ').map(_ .toDouble))
267+
268+ // Cluster the data into two classes using KMeans
269+ val numIterations = 20
270+ val numClusters = 2
271+ val clusters = KMeans.train(parsedData, numClusters, numIterations)
272+
273+ // Evaluate clustering by computing Within Set Sum of Squared Errors
274+ val WSSSE = clusters.computeCost(parsedData)
275+ println("Within Set Sum of Squared Errors = " + WSSSE)
276+ {% endhighlight %}
277+
278+
279+ ## Collaborative Filtering
280+ In the following example we load rating data. Each row consists of a user, a product and a rating.
281+ We use the default ALS.train() method which assumes ratings are explicit. We evaluate the recommendation
282+ model by measuring the Mean Squared Error of rating prediction.
283+
284+ {% highlight scala %}
285+ import org.apache.spark.mllib.recommendation.ALS
286+ import org.apache.spark.mllib.recommendation.Rating
287+
288+ // Load and parse the data
289+ val data = sc.textFile("mllib/data/als/test.data")
290+ val ratings = data.map(_ .split(',') match {
291+ case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
292+ })
293+
294+ // Build the recommendation model using ALS
295+ val numIterations = 20
296+ val model = ALS.train(ratings, 1, 20, 0.01)
297+
298+ // Evaluate the model on rating data
299+ val usersProducts = ratings.map{ case Rating(user, product, rate) => (user, product)}
300+ val predictions = model.predict(usersProducts).map{
301+ case Rating(user, product, rate) => ((user, product), rate)
302+ }
303+ val ratesAndPreds = ratings.map{
304+ case Rating(user, product, rate) => ((user, product), rate)
305+ }.join(predictions)
306+ val MSE = ratesAndPreds.map{
307+ case ((user, product), (r1, r2)) => math.pow((r1- r2), 2)
308+ }.reduce(_ + _ )/ratesAndPreds.count
309+ println("Mean Squared Error = " + MSE)
310+ {% endhighlight %}
311+
312+ If the rating matrix is derived from other source of information (i.e., it is inferred from
313+ other signals), you can use the trainImplicit method to get better results.
314+
315+ {% highlight scala %}
316+ val model = ALS.trainImplicit(ratings, 1, 20, 0.01)
317+ {% endhighlight %}
318+
319+ # Using MLLib in Python
320+ Following examples can be tested in the PySpark shell.
321+
322+ ## Binary Classification
323+ The following example shows how to load a sample dataset, build Logistic Regression model,
324+ and make predictions with the resulting model to compute the training error.
325+
326+ {% highlight python %}
327+ from pyspark.mllib.classification import LogisticRegressionWithSGD
328+ from numpy import array
329+
330+ # Load and parse the data
331+ data = sc.textFile("mllib/data/sample_svm_data.txt")
332+ parsedData = data.map(lambda line: array([ float(x) for x in line.split(' ')] ))
333+ model = LogisticRegressionWithSGD.train(sc, parsedData)
334+
335+ # Build the model
336+ labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)),
337+ model.predict(point.take(range(1, point.size)))))
338+
339+ # Evaluating the model on training data
340+ trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
341+ print("Training Error = " + str(trainErr))
342+ {% endhighlight %}
343+
344+ ## Linear Regression
345+ The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. The
346+ example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We
347+ compute the Mean Squared Error at the end to evaluate
348+ [ goodness of fit] ( http://en.wikipedia.org/wiki/Goodness_of_fit )
349+
350+ {% highlight python %}
351+ from pyspark.mllib.regression import LinearRegressionWithSGD
352+ from numpy import array
353+
354+ # Load and parse the data
355+ data = sc.textFile("mllib/data/ridge-data/lpsa.data")
356+ parsedData = data.map(lambda line: array([ float(x) for x in line.replace(',', ' ').split(' ')] ))
357+
358+ # Build the model
359+ model = LinearRegressionWithSGD.train(sc, parsedData)
360+
361+ # Evaluate the model on training data
362+ valuesAndPreds = parsedData.map(lambda point: (point.item(0),
363+ model.predict(point.take(range(1, point.size)))))
364+ MSE = valuesAndPreds.map(lambda (v, p): (v - p)** 2).reduce(lambda x, y: x + y)/valuesAndPreds.count()
365+ print("Mean Squared Error = " + str(MSE))
366+ {% endhighlight %}
367+
368+
369+ ## Clustering
370+ In the following example after loading and parsing data, we use the KMeans object to cluster the data
371+ into two clusters. The number of desired clusters is passed to the algorithm. We then compute Within
372+ Set Sum of Squared Error (WSSSE). You can reduce this error measure by increasing * k* . In fact the
373+ optimal * k* is usually one where there is an "elbow" in the WSSSE graph.
374+
375+ {% highlight python %}
376+ from pyspark.mllib.clustering import KMeans
377+ from numpy import array
378+ from math import sqrt
379+
380+ # Load and parse the data
381+ data = sc.textFile("kmeans_data.txt")
382+ parsedData = data.map(lambda line: array([ float(x) for x in line.split(' ')] ))
383+
384+ # Build the model (cluster the data)
385+ clusters = KMeans.train(sc, parsedData, 2, maxIterations=10,
386+ runs=30, initialization_mode="random")
387+
388+ # Evaluate clustering by computing Within Set Sum of Squared Errors
389+ def error(point):
390+ center = clusters.centers[ clusters.predict(point)]
391+ return sqrt(sum([ x** 2 for x in (point - center)] ))
392+
393+ WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
394+ print("Within Set Sum of Squared Error = " + str(WSSSE))
395+ {% endhighlight %}
396+
397+ Similarly you can use RidgeRegressionWithSGD and LassoWithSGD and compare training Mean Squared
398+ Errors.
399+
400+ ## Collaborative Filtering
401+ In the following example we load rating data. Each row consists of a user, a product and a rating.
402+ We use the default ALS.train() method which assumes ratings are explicit. We evaluate the
403+ recommendation by measuring the Mean Squared Error of rating prediction.
404+
405+ {% highlight python %}
406+ from pyspark.mllib.recommendation import ALS
407+ from numpy import array
408+
409+ # Load and parse the data
410+ data = sc.textFile("mllib/data/als/test.data")
411+ ratings = data.map(lambda line: array([ float(x) for x in line.split(',')] ))
412+
413+ # Build the recommendation model using Alternating Least Squares
414+ model = ALS.train(sc, ratings, 1, 20)
415+
416+ # Evaluate the model on training data
417+ testdata = ratings.map(lambda p: (int(p[ 0] ), int(p[ 1] )))
418+ predictions = model.predictAll(testdata).map(lambda r: ((r[ 0] , r[ 1] ), r[ 2] ))
419+ ratesAndPreds = ratings.map(lambda r: ((r[ 0] , r[ 1] ), r[ 2] )).join(predictions)
420+ MSE = ratesAndPreds.map(lambda r: (r[ 1] [ 0 ] - r[ 1] [ 1 ] )** 2).reduce(lambda x, y: x + y)/ratesAndPreds.count()
421+ print("Mean Squared Error = " + str(MSE))
422+ {% endhighlight %}
423+
424+ If the rating matrix is derived from other source of information (i.e., it is inferred from other
425+ signals), you can use the trainImplicit method to get better results.
426+
427+ {% highlight python %}
428+ # Build the recommendation model using Alternating Least Squares based on implicit ratings
429+ model = ALS.trainImplicit(sc, ratings, 1, 20)
430+ {% endhighlight %}
0 commit comments