Skip to content

Commit 4554ddd

Browse files
author
DB Tsai
committed
first commit
1 parent 69b6fed commit 4554ddd

File tree

5 files changed

+85
-68
lines changed

5 files changed

+85
-68
lines changed

mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala

Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.spark.mllib.clustering
1919

2020
import scala.collection.mutable.ArrayBuffer
2121

22-
import breeze.linalg.{DenseVector => BDV, Vector => BV}
23-
2422
import org.apache.spark.annotation.Experimental
2523
import org.apache.spark.Logging
2624
import org.apache.spark.SparkContext._
@@ -127,10 +125,10 @@ class KMeans private (
127125
// Compute squared norms and cache them.
128126
val norms = data.map(Vectors.norm(_, 2.0))
129127
norms.persist()
130-
val breezeData = data.map(_.toBreeze).zip(norms).map { case (v, norm) =>
131-
new BreezeVectorWithNorm(v, norm)
128+
val zippedData = data.zip(norms).map { case (v, norm) =>
129+
new VectorWithNorm(v, norm)
132130
}
133-
val model = runBreeze(breezeData)
131+
val model = runAlgorithm(zippedData)
134132
norms.unpersist()
135133

136134
// Warn at the end of the run as well, for increased visibility.
@@ -142,9 +140,9 @@ class KMeans private (
142140
}
143141

144142
/**
145-
* Implementation of K-Means using breeze.
143+
* Implementation of K-Means algorithm.
146144
*/
147-
private def runBreeze(data: RDD[BreezeVectorWithNorm]): KMeansModel = {
145+
private def runAlgorithm(data: RDD[VectorWithNorm]): KMeansModel = {
148146

149147
val sc = data.sparkContext
150148

@@ -170,9 +168,18 @@ class KMeans private (
170168

171169
// Execute iterations of Lloyd's algorithm until all runs have converged
172170
while (iteration < maxIterations && !activeRuns.isEmpty) {
173-
type WeightedPoint = (BV[Double], Long)
171+
type WeightedPoint = (Array[Double], Long)
174172
def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
175-
(p1._1 += p2._1, p1._2 + p2._2)
173+
val v1 = p1._1
174+
val v2 = p2._1
175+
require(v1.size == v2.size)
176+
val size = v1.size
177+
var i = 0
178+
while(i < size) {
179+
v1(i) += v2(i)
180+
i += 1
181+
}
182+
(v1, p1._2 + p2._2)
176183
}
177184

178185
val activeCenters = activeRuns.map(r => centers(r)).toArray
@@ -185,16 +192,17 @@ class KMeans private (
185192
val thisActiveCenters = bcActiveCenters.value
186193
val runs = thisActiveCenters.length
187194
val k = thisActiveCenters(0).length
188-
val dims = thisActiveCenters(0)(0).vector.length
195+
val dims = thisActiveCenters(0)(0).vector.size
189196

190-
val sums = Array.fill(runs, k)(BDV.zeros[Double](dims).asInstanceOf[BV[Double]])
197+
val sums = Array.fill(runs, k)(Array.ofDim[Double](dims))
191198
val counts = Array.fill(runs, k)(0L)
192199

193200
points.foreach { point =>
194201
(0 until runs).foreach { i =>
195202
val (bestCenter, cost) = KMeans.findClosest(thisActiveCenters(i), point)
196203
costAccums(i) += cost
197-
sums(i)(bestCenter) += point.vector
204+
val sum = sums(i)(bestCenter)
205+
point.vector.foreachActive((index, value) => sum(index) += value)
198206
counts(i)(bestCenter) += 1
199207
}
200208
}
@@ -212,8 +220,12 @@ class KMeans private (
212220
while (j < k) {
213221
val (sum, count) = totalContribs((i, j))
214222
if (count != 0) {
215-
sum /= count.toDouble
216-
val newCenter = new BreezeVectorWithNorm(sum)
223+
var i = 0
224+
while(i < sum.size) {
225+
sum(i) /= count
226+
i += 1
227+
}
228+
val newCenter = new VectorWithNorm(sum)
217229
if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {
218230
changed = true
219231
}
@@ -245,18 +257,18 @@ class KMeans private (
245257

246258
logInfo(s"The cost for the best run is $minCost.")
247259

248-
new KMeansModel(centers(bestRun).map(c => Vectors.fromBreeze(c.vector)))
260+
new KMeansModel(centers(bestRun).map(c => c.vector))
249261
}
250262

251263
/**
252264
* Initialize `runs` sets of cluster centers at random.
253265
*/
254-
private def initRandom(data: RDD[BreezeVectorWithNorm])
255-
: Array[Array[BreezeVectorWithNorm]] = {
266+
private def initRandom(data: RDD[VectorWithNorm])
267+
: Array[Array[VectorWithNorm]] = {
256268
// Sample all the cluster centers in one pass to avoid repeated scans
257269
val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq
258270
Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v =>
259-
new BreezeVectorWithNorm(v.vector.toDenseVector, v.norm)
271+
new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm)
260272
}.toArray)
261273
}
262274

@@ -269,8 +281,8 @@ class KMeans private (
269281
*
270282
* The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
271283
*/
272-
private def initKMeansParallel(data: RDD[BreezeVectorWithNorm])
273-
: Array[Array[BreezeVectorWithNorm]] = {
284+
private def initKMeansParallel(data: RDD[VectorWithNorm])
285+
: Array[Array[VectorWithNorm]] = {
274286
// Initialize each run's center to a random point
275287
val seed = new XORShiftRandom().nextInt()
276288
val sample = data.takeSample(true, runs, seed).toSeq
@@ -376,8 +388,8 @@ object KMeans {
376388
* Returns the index of the closest center to the given point, as well as the squared distance.
377389
*/
378390
private[mllib] def findClosest(
379-
centers: TraversableOnce[BreezeVectorWithNorm],
380-
point: BreezeVectorWithNorm): (Int, Double) = {
391+
centers: TraversableOnce[VectorWithNorm],
392+
point: VectorWithNorm): (Int, Double) = {
381393
var bestDistance = Double.PositiveInfinity
382394
var bestIndex = 0
383395
var i = 0
@@ -402,35 +414,33 @@ object KMeans {
402414
* Returns the K-means cost of a given point against the given cluster centers.
403415
*/
404416
private[mllib] def pointCost(
405-
centers: TraversableOnce[BreezeVectorWithNorm],
406-
point: BreezeVectorWithNorm): Double =
417+
centers: TraversableOnce[VectorWithNorm],
418+
point: VectorWithNorm): Double =
407419
findClosest(centers, point)._2
408420

409421
/**
410422
* Returns the squared Euclidean distance between two vectors computed by
411423
* [[org.apache.spark.mllib.util.MLUtils#fastSquaredDistance]].
412424
*/
413425
private[clustering] def fastSquaredDistance(
414-
v1: BreezeVectorWithNorm,
415-
v2: BreezeVectorWithNorm): Double = {
426+
v1: VectorWithNorm,
427+
v2: VectorWithNorm): Double = {
416428
MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm)
417429
}
418430
}
419431

420432
/**
421-
* A breeze vector with its norm for fast distance computation.
433+
* A vector with its norm for fast distance computation.
422434
*
423435
* @see [[org.apache.spark.mllib.clustering.KMeans#fastSquaredDistance]]
424436
*/
425437
private[clustering]
426-
class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable {
427-
428-
def this(vector: BV[Double]) = this(vector, Vectors.norm(Vectors.fromBreeze(vector), 2.0))
438+
class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable {
429439

430-
def this(array: Array[Double]) = this(new BDV[Double](array))
440+
def this(vector: Vector) = this(vector, Vectors.norm(vector, 2.0))
431441

432-
def this(v: Vector) = this(v.toBreeze)
442+
def this(array: Array[Double]) = this(Vectors.dense(array))
433443

434444
/** Converts the vector to a dense vector. */
435-
def toDense = new BreezeVectorWithNorm(vector.toDenseVector, norm)
445+
def toDense = new VectorWithNorm(Vectors.dense(vector.toArray), norm)
436446
}

mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ class KMeansModel (val clusterCenters: Array[Vector]) extends Serializable {
3232

3333
/** Returns the cluster index that a given point belongs to. */
3434
def predict(point: Vector): Int = {
35-
KMeans.findClosest(clusterCentersWithNorm, new BreezeVectorWithNorm(point))._1
35+
KMeans.findClosest(clusterCentersWithNorm, new VectorWithNorm(point))._1
3636
}
3737

3838
/** Maps given points to their cluster indices. */
3939
def predict(points: RDD[Vector]): RDD[Int] = {
4040
val centersWithNorm = clusterCentersWithNorm
4141
val bcCentersWithNorm = points.context.broadcast(centersWithNorm)
42-
points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new BreezeVectorWithNorm(p))._1)
42+
points.map(p => KMeans.findClosest(bcCentersWithNorm.value, new VectorWithNorm(p))._1)
4343
}
4444

4545
/** Maps given points to their cluster indices. */
@@ -53,9 +53,9 @@ class KMeansModel (val clusterCenters: Array[Vector]) extends Serializable {
5353
def computeCost(data: RDD[Vector]): Double = {
5454
val centersWithNorm = clusterCentersWithNorm
5555
val bcCentersWithNorm = data.context.broadcast(centersWithNorm)
56-
data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new BreezeVectorWithNorm(p))).sum()
56+
data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum()
5757
}
5858

59-
private def clusterCentersWithNorm: Iterable[BreezeVectorWithNorm] =
60-
clusterCenters.map(new BreezeVectorWithNorm(_))
59+
private def clusterCentersWithNorm: Iterable[VectorWithNorm] =
60+
clusterCenters.map(new VectorWithNorm(_))
6161
}

mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package org.apache.spark.mllib.clustering
1919

2020
import scala.util.Random
2121

22-
import breeze.linalg.{Vector => BV, DenseVector => BDV, norm => breezeNorm}
23-
2422
import org.apache.spark.Logging
23+
import org.apache.spark.mllib.linalg.Vectors
24+
import org.apache.spark.mllib.linalg.BLAS.axpy
2525

2626
/**
2727
* An utility object to run K-means locally. This is private to the ML package because it's used
@@ -35,14 +35,14 @@ private[mllib] object LocalKMeans extends Logging {
3535
*/
3636
def kMeansPlusPlus(
3737
seed: Int,
38-
points: Array[BreezeVectorWithNorm],
38+
points: Array[VectorWithNorm],
3939
weights: Array[Double],
4040
k: Int,
4141
maxIterations: Int
42-
): Array[BreezeVectorWithNorm] = {
42+
): Array[VectorWithNorm] = {
4343
val rand = new Random(seed)
44-
val dimensions = points(0).vector.length
45-
val centers = new Array[BreezeVectorWithNorm](k)
44+
val dimensions = points(0).vector.size
45+
val centers = new Array[VectorWithNorm](k)
4646

4747
// Initialize centers by sampling using the k-means++ procedure.
4848
centers(0) = pickWeighted(rand, points, weights).toDense
@@ -75,14 +75,12 @@ private[mllib] object LocalKMeans extends Logging {
7575
while (moved && iteration < maxIterations) {
7676
moved = false
7777
val counts = Array.fill(k)(0.0)
78-
val sums = Array.fill(k)(
79-
BDV.zeros[Double](dimensions).asInstanceOf[BV[Double]]
80-
)
78+
val sums = Array.fill(k)(Array.ofDim[Double](dimensions))
8179
var i = 0
8280
while (i < points.length) {
8381
val p = points(i)
8482
val index = KMeans.findClosest(centers, p)._1
85-
breeze.linalg.axpy(weights(i), p.vector, sums(index))
83+
axpy(weights(i), p.vector, Vectors.dense(sums(index)))
8684
counts(index) += weights(i)
8785
if (index != oldClosest(i)) {
8886
moved = true
@@ -97,8 +95,15 @@ private[mllib] object LocalKMeans extends Logging {
9795
// Assign center to a random point
9896
centers(j) = points(rand.nextInt(points.length)).toDense
9997
} else {
100-
sums(j) /= counts(j)
101-
centers(j) = new BreezeVectorWithNorm(sums(j))
98+
val sum = sums(j)
99+
val count = counts(j)
100+
val size = sum.size
101+
var i = 0
102+
while(i < size) {
103+
sum(i) /= count
104+
i += 1
105+
}
106+
centers(j) = new VectorWithNorm(sums(j))
102107
}
103108
j += 1
104109
}

mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@ package org.apache.spark.mllib.util
1919

2020
import scala.reflect.ClassTag
2121

22-
import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV,
23-
squaredDistance => breezeSquaredDistance}
22+
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV,
23+
squaredDistance => breezeSquaredDistance}
2424

2525
import org.apache.spark.annotation.Experimental
2626
import org.apache.spark.SparkContext
2727
import org.apache.spark.rdd.RDD
2828
import org.apache.spark.rdd.PartitionwiseSampledRDD
2929
import org.apache.spark.util.random.BernoulliCellSampler
3030
import org.apache.spark.mllib.regression.LabeledPoint
31-
import org.apache.spark.mllib.linalg.{Vector, Vectors}
31+
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
32+
import org.apache.spark.mllib.linalg.BLAS.dot
3233
import org.apache.spark.storage.StorageLevel
3334
import org.apache.spark.streaming.StreamingContext
3435
import org.apache.spark.streaming.dstream.DStream
@@ -281,9 +282,9 @@ object MLUtils {
281282
* @return squared distance between v1 and v2 within the specified precision
282283
*/
283284
private[mllib] def fastSquaredDistance(
284-
v1: BV[Double],
285+
v1: Vector,
285286
norm1: Double,
286-
v2: BV[Double],
287+
v2: Vector,
287288
norm2: Double,
288289
precision: Double = 1e-6): Double = {
289290
val n = v1.size
@@ -306,16 +307,17 @@ object MLUtils {
306307
*/
307308
val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
308309
if (precisionBound1 < precision) {
309-
sqDist = sumSquaredNorm - 2.0 * v1.dot(v2)
310-
} else if (v1.isInstanceOf[BSV[Double]] || v2.isInstanceOf[BSV[Double]]) {
311-
val dot = v1.dot(v2)
312-
sqDist = math.max(sumSquaredNorm - 2.0 * dot, 0.0)
313-
val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dot)) / (sqDist + EPSILON)
310+
sqDist = sumSquaredNorm - 2.0 * dot(v1, v2)
311+
} else if (v1.isInstanceOf[SparseVector] || v2.isInstanceOf[SparseVector]) {
312+
val dotValue = dot(v1, v2)
313+
sqDist = math.max(sumSquaredNorm - 2.0 * dotValue, 0.0)
314+
val precisionBound2 = EPSILON * (sumSquaredNorm + 2.0 * math.abs(dotValue)) / (sqDist + EPSILON)
314315
if (precisionBound2 > precision) {
315-
sqDist = breezeSquaredDistance(v1, v2)
316+
// TODO: breezeSquaredDistance is slow, so we should replace it with our own implementation.
317+
sqDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze)
316318
}
317319
} else {
318-
sqDist = breezeSquaredDistance(v1, v2)
320+
sqDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze)
319321
}
320322
sqDist
321323
}

mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,18 @@ class MLUtilsSuite extends FunSuite with MLlibTestSparkContext {
4444
test("fast squared distance") {
4545
val a = (30 to 0 by -1).map(math.pow(2.0, _)).toArray
4646
val n = a.length
47-
val v1 = new BDV[Double](a)
48-
val norm1 = breezeNorm(v1, 2.0)
47+
val v1 = Vectors.dense(a)
48+
val norm1 = Vectors.norm(v1, 2.0)
4949
val precision = 1e-6
5050
for (m <- 0 until n) {
5151
val indices = (0 to m).toArray
5252
val values = indices.map(i => a(i))
53-
val v2 = new BSV[Double](indices, values, n)
54-
val norm2 = breezeNorm(v2, 2.0)
55-
val squaredDist = breezeSquaredDistance(v1, v2)
53+
val v2 = Vectors.sparse(n, indices, values)
54+
val norm2 = Vectors.norm(v2, 2.0)
55+
val squaredDist = breezeSquaredDistance(v1.toBreeze, v2.toBreeze)
5656
val fastSquaredDist1 = fastSquaredDistance(v1, norm1, v2, norm2, precision)
5757
assert((fastSquaredDist1 - squaredDist) <= precision * squaredDist, s"failed with m = $m")
58-
val fastSquaredDist2 = fastSquaredDistance(v1, norm1, v2.toDenseVector, norm2, precision)
58+
val fastSquaredDist2 = fastSquaredDistance(v1, norm1, Vectors.dense(v2.toArray), norm2, precision)
5959
assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, s"failed with m = $m")
6060
}
6161
}

0 commit comments

Comments
 (0)