-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-4118] [MLlib] [PySpark] Python bindings for StreamingKMeans #6499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
d8b066a
4b1481f
ee8ce16
c80e451
a9817df
8ab9e89
5d9fe61
81482fd
2061a76
51052d3
7722d16
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -592,15 +592,55 @@ model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() | |
| ssc.start() | ||
| ssc.awaitTermination() | ||
|
|
||
| {% endhighlight %} | ||
| </div> | ||
|
|
||
| <div data-lang="python" markdown="1"> | ||
| First we import the neccessary classes. | ||
|
|
||
| {% highlight python %} | ||
| from pyspark.mllib.linalg import Vectors | ||
| from pyspark.mllib.regression import LabeledPoint | ||
| from pyspark.mllib.clustering import StreamingKMeans | ||
| {% endhighlight %} | ||
|
|
||
| Then we make an input stream of vectors for training, as well as a stream of labeled data | ||
| points for testing. We assume a StreamingContext `ssc` has been created, see | ||
| [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. | ||
|
|
||
| {% highlight python %} | ||
| def parse(lp): | ||
| label = float(lp[lp.find('(') + 1: lp.find(',')]) | ||
| vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(',')) | ||
| return LabeledPoint(label, vec) | ||
|
|
||
| trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) | ||
| testData = ssc.textFileStream("/testing/data/dir").map(parse) | ||
| {% endhighlight %} | ||
|
|
||
| We create a model with random clusters and specify the number of clusters to find | ||
|
|
||
| {% highlight python %} | ||
| model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0) | ||
| {% endhighlight %} | ||
|
|
||
| Now register the streams for training and testing and start the job, printing | ||
| the predicted cluster assignments on new data points as they arrive. | ||
|
|
||
| {% highlight python %} | ||
| model.trainOn(trainingData) | ||
| model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no output from this example. Call |
||
|
|
||
| ssc.start() | ||
| ssc.awaitTermination() | ||
| {% endhighlight %} | ||
| </div> | ||
|
|
||
| </div> | ||
|
|
||
| As you add new text files with data the cluster centers will update. Each training | ||
| point should be formatted as `[x1, x2, x3]`, and each test data point | ||
| should be formatted as `(y, [x1, x2, x3])`, where `y` is some useful label or identifier | ||
| (e.g. a true category assignment). Anytime a text file is placed in `/training/data/dir` | ||
| the model will update. Anytime a text file is placed in `/testing/data/dir` | ||
| you will see predictions. With new data, the cluster centers will change! | ||
|
|
||
| </div> | ||
|
|
||
| </div> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -964,6 +964,21 @@ private[python] class PythonMLLibAPI extends Serializable { | |
| points.asScala.toArray) | ||
| } | ||
|
|
||
| /** | ||
| * Java stub for the update method of StreamingKMeansModel. | ||
| */ | ||
| def updateStreamingKMeansModel( | ||
| clusterCenters: JList[Vector], | ||
| clusterWeights: JList[Double], | ||
| data: JavaRDD[Vector], | ||
| decayFactor: Double, | ||
| timeUnit: String): JList[Object] = { | ||
| val model = new StreamingKMeansModel( | ||
| clusterCenters.asScala.toArray, clusterWeights.asScala.toArray) | ||
| .update(data, decayFactor, timeUnit) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix identation |
||
| List[AnyRef](model.clusterCenters, Vectors.dense(model.clusterWeights)).asJava | ||
| } | ||
|
|
||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,14 +21,17 @@ | |
| if sys.version > '3': | ||
| xrange = range | ||
|
|
||
| from numpy import array | ||
| from math import exp, log | ||
|
|
||
| from numpy import array, random, tile | ||
|
|
||
| from pyspark import RDD | ||
| from pyspark import SparkContext | ||
| from pyspark.rdd import RDD, ignore_unicode_prefix | ||
| from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py | ||
| from pyspark.mllib.linalg import SparseVector, _convert_to_vector | ||
| from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector | ||
| from pyspark.mllib.stat.distribution import MultivariateGaussian | ||
| from pyspark.mllib.util import Saveable, Loader, inherit_doc | ||
| from pyspark.streaming import DStream | ||
|
|
||
| __all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture'] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. import |
||
|
|
||
|
|
@@ -98,6 +101,9 @@ def predict(self, x): | |
| """Find the cluster to which x belongs in this model.""" | ||
| best = 0 | ||
| best_distance = float("inf") | ||
| if isinstance(x, RDD): | ||
| return x.map(self.predict) | ||
|
|
||
| x = _convert_to_vector(x) | ||
| for i in xrange(len(self.centers)): | ||
| distance = x.squared_distance(self.centers[i]) | ||
|
|
@@ -264,6 +270,190 @@ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initia | |
| return GaussianMixtureModel(weight, mvg_obj) | ||
|
|
||
|
|
||
| class StreamingKMeansModel(KMeansModel): | ||
| """ | ||
| .. note:: Experimental | ||
| Clustering model which can perform an online update of the centroids. | ||
|
|
||
| The update formula for each centroid is given by | ||
| c_t+1 = [(c_t * n_t * a) + (x_t * m_t)] / [n_t + m_t] | ||
| n_t+1 = n_t * a + m_t | ||
|
|
||
| where | ||
| c_t: Centroid at the n_th iteration. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please run Also the generated doc doesn't show a list of bullets here. Please check sphinx doc. Btw, we don't recommend vertical alignment, because it is hard to update. |
||
| n_t: Number of samples (or) weights associated with the centroid | ||
| at the n_th iteration. | ||
| x_t: Centroid of the new data closest to c_t. | ||
| m_t: Number of samples (or) weights of the new data closest to c_t | ||
| c_t+1: New centroid. | ||
| n_t+1: New number of weights. | ||
| a: Decay Factor, which gives the forgetfulness. | ||
|
|
||
| Note that if a is set to 1, it is the weighted mean of the previous | ||
| and new data. If it set to zero, the old centroids are completely | ||
| forgotten. | ||
|
|
||
| >>> initCenters, initWeights = [[0.0, 0.0], [1.0, 1.0]], [1.0, 1.0] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is easier to read: initCenters = [[0.0, 0.0], [1.0, 1.0]]
initWeights = [1.0, 1.0] |
||
| >>> stkm = StreamingKMeansModel(initCenters, initWeights) | ||
| >>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1], | ||
| ... [0.9, 0.9], [1.1, 1.1]]) | ||
| >>> stkm = stkm.update(data, 1.0, u"batches") | ||
| >>> stkm.centers | ||
| array([[ 0., 0.], | ||
| [ 1., 1.]]) | ||
| >>> stkm.predict([-0.1, -0.1]) == stkm.predict([0.1, 0.1]) == 0 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be sufficient to show only one example, and simulate what users would type in a console: >>> stkm.predict([-0.1, -0.1])
1 |
||
| True | ||
| >>> stkm.predict([0.9, 0.9]) == stkm.predict([1.1, 1.1]) == 1 | ||
| True | ||
| >>> stkm.clusterWeights | ||
| [3.0, 3.0] | ||
| >>> decayFactor = 0.0 | ||
| >>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])]) | ||
| >>> stkm = stkm.update(data, 0.0, u"batches") | ||
| >>> stkm.centers | ||
| array([[ 0.2, 0.2], | ||
| [ 1.5, 1.5]]) | ||
| >>> stkm.clusterWeights | ||
| [1.0, 1.0] | ||
| >>> stkm.predict([0.2, 0.2]) | ||
| 0 | ||
| >>> stkm.predict([1.5, 1.5]) | ||
| 1 | ||
|
|
||
| :param clusterCenters: Initial cluster centers. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure about whether we should put params doc before doctest or after. But since we put it before doctest in other classes, maybe we should do the same here. |
||
| :param clusterWeights: List of weights assigned to each cluster. | ||
| """ | ||
| def __init__(self, clusterCenters, clusterWeights): | ||
| super(StreamingKMeansModel, self).__init__(centers=clusterCenters) | ||
| self._clusterWeights = list(clusterWeights) | ||
|
|
||
| @property | ||
| def clusterWeights(self): | ||
| """Convenience method to return the cluster weights.""" | ||
| return self._clusterWeights | ||
|
|
||
| @ignore_unicode_prefix | ||
| def update(self, data, decayFactor, timeUnit): | ||
| """Update the centroids, according to data | ||
|
|
||
| :param data: Should be a RDD that represents the new data. | ||
| :param decayFactor: forgetfulness of the previous centroids. | ||
| :param timeUnit: Can be "batches" or "points" | ||
|
|
||
| If points, then the decay factor is raised to the power of | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing indentation |
||
| number of new points and if batches, it is used as it is. | ||
| """ | ||
| if not isinstance(data, RDD): | ||
| raise TypeError("data should be of a RDD, got %s." % type(data)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| data = data.map(_convert_to_vector) | ||
| decayFactor = float(decayFactor) | ||
| if timeUnit not in ["batches", "points"]: | ||
| raise ValueError( | ||
| "timeUnit should be 'batches' or 'points', got %s." % timeUnit) | ||
| vectorCenters = [_convert_to_vector(center) for center in self.centers] | ||
| updatedModel = callMLlibFunc( | ||
| "updateStreamingKMeansModel", vectorCenters, self._clusterWeights, | ||
| data, decayFactor, timeUnit) | ||
| self.centers = array(updatedModel[0]) | ||
| self._clusterWeights = list(updatedModel[1]) | ||
| return self | ||
|
|
||
|
|
||
| class StreamingKMeans(object): | ||
| """ | ||
| .. note:: Experimental | ||
|
|
||
| Provides methods to set k, decayFactor, timeUnit to train and | ||
| predict the incoming data | ||
|
|
||
| :param k: int, number of clusters | ||
| :param decayFactor: float, forgetfulness of the previous centroids. | ||
| :param timeUnit: can be "batches" or "points". If points, then the | ||
| decayfactor is raised to the power of no. of new points. | ||
| """ | ||
| def __init__(self, k=2, decayFactor=1.0, timeUnit="batches"): | ||
| self._k = k | ||
| self._decayFactor = decayFactor | ||
| if timeUnit not in ["batches", "points"]: | ||
| raise ValueError( | ||
| "timeUnit should be 'batches' or 'points', got %s." % timeUnit) | ||
| self._timeUnit = timeUnit | ||
| self._model = None | ||
|
|
||
| def latestModel(self): | ||
| """Return the latest model""" | ||
| return self._model | ||
|
|
||
| def _validate(self, dstream): | ||
| if self._model is None: | ||
| raise ValueError( | ||
| "Initial centers should be set either by setInitialCenters " | ||
| "or setRandomCenters.") | ||
| if not isinstance(dstream, DStream): | ||
| raise TypeError( | ||
| "Expected dstream to be of type DStream, " | ||
| "got type %s" % type(dstream)) | ||
|
|
||
| def setK(self, k): | ||
| """Set number of clusters.""" | ||
| self._k = k | ||
| return self | ||
|
|
||
| def setDecayFactor(self, decayFactor): | ||
| """Set decay factor.""" | ||
| self._decayFactor = decayFactor | ||
| return self | ||
|
|
||
| def setHalfLife(self, halfLife, timeUnit): | ||
| """ | ||
| Set number of instances after which the centroids at | ||
| has 0.5 weightage | ||
| """ | ||
| self._timeUnit = timeUnit | ||
| self._decayFactor = exp(log(0.5) / halfLife) | ||
| return self | ||
|
|
||
| def setInitialCenters(self, centers, weights): | ||
| self._model = StreamingKMeansModel(centers, weights) | ||
| return self | ||
|
|
||
| def setRandomCenters(self, dim, weight, seed): | ||
| """ | ||
| Set the initial centres to be random samples from | ||
| a gaussian population with constant weights. | ||
| """ | ||
| rng = random.RandomState(seed) | ||
| clusterCenters = rng.randn(self._k, dim) | ||
| clusterWeights = tile(weight, self._k) | ||
| self._model = StreamingKMeansModel(clusterCenters, clusterWeights) | ||
| return self | ||
|
|
||
| def trainOn(self, dstream): | ||
| """Train the model on the incoming dstream.""" | ||
| self._validate(dstream) | ||
|
|
||
| def update(rdd): | ||
| self._model.update(rdd, self._decayFactor, self._timeUnit) | ||
|
|
||
| dstream.foreachRDD(update) | ||
|
|
||
| def predictOn(self, dstream): | ||
| """ | ||
| Make predictions on a dstream. | ||
| Returns a transformed dstream object | ||
| """ | ||
| self._validate(dstream) | ||
| return dstream.map(lambda x: self._model.predict(x)) | ||
|
|
||
| def predictOnValues(self, dstream): | ||
| """ | ||
| Make predictions on a keyed dstream. | ||
| Returns a transformed dstream object. | ||
| """ | ||
| self._validate(dstream) | ||
| return dstream.mapValues(lambda x: self._model.predict(x)) | ||
|
|
||
|
|
||
| def _test(): | ||
| import doctest | ||
| globs = globals().copy() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw the Java/Scala examples were implemented this way. However, it is hard to imagine a case when you have some pre-labeled data for test in k-means, because the initial clusters are usually randomized. Let's keep this as is.