@@ -44,19 +44,11 @@ object PowerIterationClustering {
4444 type DGraph = Graph [Double , Double ]
4545 type IndexedVector [Double ] = (Long , BDV [Double ])
4646
47-
4847 // Terminate iteration when norm changes by less than this value
49- private [mllib] val defaultMinNormChange : Double = 1e-11
50-
51- // Default sigma for Gaussian Distance calculations
52- private [mllib] val defaultSigma = 1.0
48+ val defaultMinNormChange : Double = 1e-11
5349
5450 // Default number of iterations for PIC loop
55- private [mllib] val defaultIterations : Int = 20
56-
57- // Default minimum affinity between points - lower than this it is considered
58- // zero and no edge will be created
59- private [mllib] val defaultMinAffinity = 1e-11
51+ val defaultIterations : Int = 20
6052
6153 // Do not allow divide by zero: change to this value instead
6254 val defaultDivideByZeroVal : Double = 1e-15
@@ -73,11 +65,6 @@ object PowerIterationClustering {
7365 * @param nClusters Number of clusters to create
7466 * @param nIterations Number of iterations of the PIC algorithm
7567 * that calculates primary PseudoEigenvector and Eigenvalue
76- * @param sigma Sigma for Gaussian distribution calculation according to
77- * [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2
78- * @param minAffinity Minimum Affinity between two Points in the input dataset: below
79- * this threshold the affinity will be considered "close to" zero and
80- * no Edge will be created between those Points in the sparse matrix
8168 * @param nRuns Number of runs for the KMeans clustering
8269 * @return Tuple of (Seq[(Cluster Id,Cluster Center)],
8370 * Seq[(VertexId, ClusterID Membership)]
@@ -86,8 +73,6 @@ object PowerIterationClustering {
8673 G : Graph [Double , Double ],
8774 nClusters : Int ,
8875 nIterations : Int = defaultIterations,
89- sigma : Double = defaultSigma,
90- minAffinity : Double = defaultMinAffinity,
9176 nRuns : Int = defaultKMeansRuns)
9277 : (Seq [(Int , Vector )], Seq [((VertexId , Vector ), Int )]) = {
9378 val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G , nIterations)
@@ -122,45 +107,6 @@ object PowerIterationClustering {
122107 (ccs, estCollected)
123108 }
124109
125- /**
126- *
127- * Create an affinity matrix
128- *
129- * @param sc Spark Context
130- * @param points Input Points in format of [(VertexId,(x,y)]
131- * where VertexId is a Long
132- * @param sigma Sigma for Gaussian distribution calculation according to
133- * [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2
134- * @param minAffinity Minimum Affinity between two Points in the input dataset: below
135- * this threshold the affinity will be considered "close to" zero and
136- * no Edge will be created between those Points in the sparse matrix
137- * @return Tuple of (Seq[(Cluster Id,Cluster Center)],
138- * Seq[(VertexId, ClusterID Membership)]
139- */
140- def createGaussianAffinityMatrix (sc : SparkContext ,
141- points : Points ,
142- sigma : Double = defaultSigma,
143- minAffinity : Double = defaultMinAffinity)
144- : Graph [Double , Double ] = {
145- val vidsRdd = sc.parallelize(points.map(_._1).sorted)
146- val nVertices = points.length
147-
148- val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
149- val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
150- if (logger.isDebugEnabled) {
151- logger.debug(s " Vt(0)= ${
152- printVector(new BDV (initialVt.map {
153- _._2
154- }.toArray))
155- }" )
156- }
157- val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
158- val G = createGraphFromEdges(sc, edgesRdd, points.size, Some (initialVt))
159- if (logger.isDebugEnabled) {
160- logger.debug(printMatrixFromEdges(G .edges))
161- }
162- G
163- }
164110
165111 /**
166112 * Create a Graph given an initial Vt0 and a set of Edges that
@@ -270,154 +216,5 @@ object PowerIterationClustering {
270216 initialVt
271217 }
272218
273- /**
274- * Calculate the Gaussian distance between two Vectors according to:
275- *
276- * exp( -(X1-X2)^2/2*sigma^2))
277- *
278- * where X1 and X2 are Vectors
279- *
280- * @param vect1 Input Vector1
281- * @param vect2 Input Vector2
282- * @param sigma Gaussian parameter sigma
283- * @return
284- */
285- private [mllib] def gaussianDist (vect1 : BDV [Double ], vect2 : BDV [Double ], sigma : Double ) = {
286- val c1c2 = vect1.toArray.zip(vect2.toArray)
287- val dist = Math .exp((- 0.5 / Math .pow(sigma, 2.0 )) * c1c2.foldLeft(0.0 ) {
288- case (dist : Double , (c1 : Double , c2 : Double )) =>
289- dist + Math .pow(c1 - c2, 2 )
290- })
291- dist
292- }
293-
294- /**
295- * Create a sparse EdgeRDD from an array of densevectors. The elements that
296- * are "close to" zero - as configured by the minAffinity value - do not
297- * result in an Edge being created.
298- *
299- * @param sc
300- * @param wRdd
301- * @param minAffinity
302- * @return
303- */
304- private [mllib] def createSparseEdgesRdd (sc : SparkContext , wRdd : RDD [IndexedVector [Double ]],
305- minAffinity : Double = defaultMinAffinity) = {
306- val labels = wRdd.map { case (vid, vect) => vid}.collect
307- val edgesRdd = wRdd.flatMap { case (vid, vect) =>
308- for ((dval, ix) <- vect.toArray.zipWithIndex
309- if Math .abs(dval) >= minAffinity)
310- yield Edge (vid, labels(ix), dval)
311- }
312- edgesRdd
313- }
314-
315- /**
316- * Create the normalized affinity matrix "W" given a set of Points
317- *
318- * @param sc SparkContext
319- * @param points Input Points in format of [(VertexId,(x,y)]
320- * where VertexId is a Long
321- * @param sigma Gaussian parameter sigma
322- * @return
323- */
324- private [mllib] def createNormalizedAffinityMatrix (sc : SparkContext ,
325- points : Points , sigma : Double ) = {
326- val nVertices = points.length
327- val affinityRddNotNorm = sc.parallelize({
328- val ivect = new Array [IndexedVector [Double ]](nVertices)
329- for (i <- 0 until points.size) {
330- ivect(i) = new IndexedVector (points(i)._1, new BDV (Array .fill(nVertices)(100.0 )))
331- for (j <- 0 until points.size) {
332- val dist = if (i != j) {
333- gaussianDist(points(i)._2, points(j)._2, sigma)
334- } else {
335- 0.0
336- }
337- ivect(i)._2(j) = dist
338- }
339- }
340- ivect.zipWithIndex.map { case (vect, ix) =>
341- (ix, vect)
342- }
343- }, nVertices)
344- if (logger.isDebugEnabled) {
345- logger.debug(s " Affinity: \n ${
346- printMatrix(affinityRddNotNorm.map(_._2), nVertices, nVertices)
347- }" )
348- }
349- val rowSums = affinityRddNotNorm.map { case (ix, (vid, vect)) =>
350- vect.foldLeft(0.0 ) {
351- _ + _
352- }
353- }
354- val materializedRowSums = rowSums.collect
355- val similarityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) =>
356- (vid, vect.map {
357- _ / materializedRowSums(rowx)
358- })
359- }
360- if (logger.isDebugEnabled) {
361- logger.debug(s " W: \n ${printMatrix(similarityRdd, nVertices, nVertices)}" )
362- }
363- (similarityRdd, materializedRowSums)
364- }
365-
366- private [mllib] def printMatrix (denseVectorRDD : RDD [LabeledPoint ], i : Int , i1 : Int ) = {
367- denseVectorRDD.collect.map {
368- case (vid, dvect) => dvect.toArray
369- }.flatten
370- }
371-
372- private [mllib] def printMatrixFromEdges (edgesRdd : EdgeRDD [_]) = {
373- val edgec = edgesRdd.collect
374- val sorted = edgec.sortWith { case (e1, e2) =>
375- e1.srcId < e2.srcId || (e1.srcId == e2.srcId && e1.dstId <= e2.dstId)
376- }
377-
378- }
379-
380- private [mllib] def makeNonZero (dval : Double , tol : Double = defaultDivideByZeroVal) = {
381- if (Math .abs(dval) < tol) {
382- Math .signum(dval) * tol
383- } else {
384- dval
385- }
386- }
387-
388- private [mllib] def printMatrix (mat : BDM [Double ]): String
389- = printMatrix(mat, mat.rows, mat.cols)
390-
391- private [mllib] def printMatrix (mat : BDM [Double ], numRows : Int , numCols : Int ): String
392- = printMatrix(mat.toArray, numRows, numCols)
393-
394- private [mllib] def printMatrix (vectors : Array [BDV [Double ]]): String = {
395- printMatrix(vectors.map {
396- _.toArray
397- }.flatten, vectors.length, vectors.length)
398- }
399-
400- private [mllib] def printMatrix (vect : Array [Double ], numRows : Int , numCols : Int ): String = {
401- val darr = vect
402- val stride = darr.length / numCols
403- val sb = new StringBuilder
404- def leftJust (s : String , len : Int ) = {
405- " " .substring(0 , len - Math .min(len, s.length)) + s
406- }
407-
408- assert(darr.length == numRows * numCols,
409- s " Input array is not correct length ( ${darr.length}) given #rows/cols= $numRows/ $numCols" )
410- for (r <- 0 until numRows) {
411- for (c <- 0 until numCols) {
412- sb.append(leftJust(f " ${darr(r * stride + c)}%.6f " , 9 ) + " " )
413- }
414- sb.append(" \n " )
415- }
416- sb.toString
417- }
418-
419- private [mllib] def printVector (dvect : BDV [Double ]) = {
420- dvect.toArray.mkString(" ," )
421- }
422219
423220}
0 commit comments