Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe
with HasSeed with HasPredictionCol with HasTol {

/**
* The number of clusters to create (k). Must be > 1. Default: 2.
* The number of clusters to create (k). Must be > 1. Note that it is possible for fewer than
* k clusters to be returned, for example, if there are fewer than k distinct points to cluster.
* Default: 2.
* @group param
*/
@Since("1.5.0")
Expand Down
27 changes: 17 additions & 10 deletions mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ class KMeans private (
def this() = this(2, 20, KMeans.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong())

/**
* Number of clusters to create (k).
* Number of clusters to create (k). Note that it is possible for fewer than k clusters to
* be returned, for example, if there are fewer than k distinct points to cluster.
*/
@Since("1.4.0")
def getK: Int = k

/**
* Set the number of clusters to create (k). Default: 2.
* Set the number of clusters to create (k). Note that it is possible for fewer than k clusters to
* be returned, for example, if there are fewer than k distinct points to cluster. Default: 2.
*/
@Since("0.8.0")
def setK(k: Int): this.type = {
Expand Down Expand Up @@ -323,7 +325,10 @@ class KMeans private (
* Initialize a set of cluster centers at random.
*/
private def initRandom(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
data.takeSample(true, k, new XORShiftRandom(this.seed).nextInt()).map(_.toDense)
// Select without replacement; may still produce duplicates if the data has < k distinct
// points, so deduplicate the centroids to match the behavior of k-means|| in the same situation
data.takeSample(false, k, new XORShiftRandom(this.seed).nextInt())
.map(_.vector).distinct.map(new VectorWithNorm(_))
}

/**
Expand All @@ -335,7 +340,7 @@ class KMeans private (
*
* The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
*/
private def initKMeansParallel(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
private[clustering] def initKMeansParallel(data: RDD[VectorWithNorm]): Array[VectorWithNorm] = {
// Initialize empty centers and point costs.
var costs = data.map(_ => Double.PositiveInfinity)

Expand Down Expand Up @@ -378,19 +383,21 @@ class KMeans private (
costs.unpersist(blocking = false)
bcNewCentersList.foreach(_.destroy(false))

if (centers.size == k) {
centers.toArray
val distinctCenters = centers.map(_.vector).distinct.map(new VectorWithNorm(_))

if (distinctCenters.size <= k) {
distinctCenters.toArray
} else {
// Finally, we might have a set of more or less than k candidate centers; weight each
// Finally, we might have a set of more than k distinct candidate centers; weight each
// candidate by the number of points in the dataset mapping to it and run a local k-means++
// on the weighted centers to pick k of them
val bcCenters = data.context.broadcast(centers)
val bcCenters = data.context.broadcast(distinctCenters)
val countMap = data.map(KMeans.findClosest(bcCenters.value, _)._1).countByValue()

bcCenters.destroy(blocking = false)

val myWeights = centers.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray
LocalKMeans.kMeansPlusPlus(0, centers.toArray, myWeights, k, 30)
val myWeights = distinctCenters.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray
LocalKMeans.kMeansPlusPlus(0, distinctCenters.toArray, myWeights, k, 30)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {

import org.apache.spark.mllib.clustering.KMeans.{K_MEANS_PARALLEL, RANDOM}

private val seed = 42

test("single cluster") {
val data = sc.parallelize(Array(
Vectors.dense(1.0, 2.0, 6.0),
Expand All @@ -38,7 +40,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {

val center = Vectors.dense(1.0, 3.0, 4.0)

// No matter how many runs or iterations we use, we should get one cluster,
// No matter how many iterations we use, we should get one cluster,
// centered at the mean of the points

var model = KMeans.train(data, k = 1, maxIterations = 1)
Expand All @@ -50,44 +52,72 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
model = KMeans.train(data, k = 1, maxIterations = 5)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM)
model = KMeans.train(data, k = 1, maxIterations = 1, initializationMode = RANDOM)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(
data, k = 1, maxIterations = 1, runs = 1, initializationMode = K_MEANS_PARALLEL)
data, k = 1, maxIterations = 1, initializationMode = K_MEANS_PARALLEL)
assert(model.clusterCenters.head ~== center absTol 1E-5)
}

test("no distinct points") {
test("fewer distinct points than clusters") {
val data = sc.parallelize(
Array(
Vectors.dense(1.0, 2.0, 3.0),
Vectors.dense(1.0, 2.0, 3.0),
Vectors.dense(1.0, 2.0, 3.0)),
2)
val center = Vectors.dense(1.0, 2.0, 3.0)

// Make sure code runs.
var model = KMeans.train(data, k = 2, maxIterations = 1)
assert(model.clusterCenters.size === 2)
}
var model = KMeans.train(data, k = 2, maxIterations = 1, initializationMode = "random")
assert(model.clusterCenters.length === 1)

test("more clusters than points") {
val data = sc.parallelize(
Array(
Vectors.dense(1.0, 2.0, 3.0),
Vectors.dense(1.0, 3.0, 4.0)),
2)
model = KMeans.train(data, k = 2, maxIterations = 1, initializationMode = "k-means||")
assert(model.clusterCenters.length === 1)
}

// Make sure code runs.
var model = KMeans.train(data, k = 3, maxIterations = 1)
assert(model.clusterCenters.size === 3)
test("unique cluster centers") {
val rng = new Random(seed)
val numDistinctPoints = 10
val points = (0 until numDistinctPoints).map(i => Vectors.dense(Array.fill(3)(rng.nextDouble)))
val data = sc.parallelize(points.flatMap(Array.fill(1 + rng.nextInt(3))(_)), 2)
val normedData = data.map(new VectorWithNorm(_))

// less centers than k
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test the "random" method here? I was going to suggest putting the test cases inside Seq("k-means||", "random").foreach { initMode =>, but we run the specific parallel case. Maybe just manually add it?

val km = new KMeans().setK(50)
.setMaxIterations(5)
.setInitializationMode("k-means||")
.setInitializationSteps(10)
.setSeed(seed)
val initialCenters = km.initKMeansParallel(normedData).map(_.vector)
assert(initialCenters.length === initialCenters.distinct.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check initialCenters.length <= numDistinctPoints where numDistinctPoints is 10 (defined above) at the moment.

assert(initialCenters.length <= numDistinctPoints)

val model = km.run(data)
val finalCenters = model.clusterCenters
assert(finalCenters.length === finalCenters.distinct.length)

// run local k-means
val k = 10
val km2 = new KMeans().setK(k)
.setMaxIterations(5)
.setInitializationMode("k-means||")
.setInitializationSteps(10)
.setSeed(seed)
val initialCenters2 = km2.initKMeansParallel(normedData).map(_.vector)
assert(initialCenters2.length === initialCenters2.distinct.length)
Copy link
Contributor

@sethah sethah Oct 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also check initialCenters2.length === k

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition failed, though it should be OK. The problem was that the data setup maps each of 10 distinct points to 0-3 copies, meaning that there may be less than 10 distinct points in the end. I just make that 1-3 copies and it works.

Fixed the doc problem too, thanks.

assert(initialCenters2.length === k)

val model2 = km2.run(data)
val finalCenters2 = model2.clusterCenters
assert(finalCenters2.length === finalCenters2.distinct.length)

val km3 = new KMeans().setK(k)
.setMaxIterations(5)
.setInitializationMode("random")
.setSeed(seed)
val model3 = km3.run(data)
val finalCenters3 = model3.clusterCenters
assert(finalCenters3.length === finalCenters3.distinct.length)
}

test("deterministic initialization") {
Expand All @@ -97,12 +127,12 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {

for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) {
// Create three deterministic models and compare cluster means
val model1 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1,
initializationMode = initMode, seed = 42)
val model1 = KMeans.train(rdd, k = 10, maxIterations = 2,
initializationMode = initMode, seed = seed)
val centers1 = model1.clusterCenters

val model2 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1,
initializationMode = initMode, seed = 42)
val model2 = KMeans.train(rdd, k = 10, maxIterations = 2,
initializationMode = initMode, seed = seed)
val centers2 = model2.clusterCenters

centers1.zip(centers2).foreach { case (c1, c2) =>
Expand All @@ -119,7 +149,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
)
val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4)

// No matter how many runs or iterations we use, we should get one cluster,
// No matter how many iterations we use, we should get one cluster,
// centered at the mean of the points

val center = Vectors.dense(1.0, 3.0, 4.0)
Expand All @@ -134,17 +164,10 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
model = KMeans.train(data, k = 1, maxIterations = 5)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
model = KMeans.train(data, k = 1, maxIterations = 1, initializationMode = RANDOM)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1,
initializationMode = K_MEANS_PARALLEL)
model = KMeans.train(data, k = 1, maxIterations = 1, initializationMode = K_MEANS_PARALLEL)
assert(model.clusterCenters.head ~== center absTol 1E-5)
}

Expand All @@ -165,7 +188,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {

data.persist()

// No matter how many runs or iterations we use, we should get one cluster,
// No matter how many iterations we use, we should get one cluster,
// centered at the mean of the points

val center = Vectors.sparse(n, Seq((0, 1.0), (1, 3.0), (2, 4.0)))
Expand All @@ -179,17 +202,10 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
model = KMeans.train(data, k = 1, maxIterations = 5)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 5)
model = KMeans.train(data, k = 1, maxIterations = 1, initializationMode = RANDOM)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1, initializationMode = RANDOM)
assert(model.clusterCenters.head ~== center absTol 1E-5)

model = KMeans.train(data, k = 1, maxIterations = 1, runs = 1,
initializationMode = K_MEANS_PARALLEL)
model = KMeans.train(data, k = 1, maxIterations = 1, initializationMode = K_MEANS_PARALLEL)
assert(model.clusterCenters.head ~== center absTol 1E-5)

data.unpersist()
Expand Down Expand Up @@ -230,11 +246,6 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
model = KMeans.train(rdd, k = 5, maxIterations = 10)
assert(model.clusterCenters.sortBy(VectorWithCompare(_))
.zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5))

// Neither should more runs
model = KMeans.train(rdd, k = 5, maxIterations = 10, runs = 5)
assert(model.clusterCenters.sortBy(VectorWithCompare(_))
.zip(points.sortBy(VectorWithCompare(_))).forall(x => x._1 ~== (x._2) absTol 1E-5))
}

test("two clusters") {
Expand All @@ -250,7 +261,7 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {

for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) {
// Two iterations are sufficient no matter where the initial centers are.
val model = KMeans.train(rdd, k = 2, maxIterations = 2, runs = 1, initMode)
val model = KMeans.train(rdd, k = 2, maxIterations = 2, initMode)

val predicts = model.predict(rdd).collect()

Expand Down