Skip to content

Commit 3fd5bc8

Browse files
committed
PIClustering is running in new branch (up to the pseudo-eigenvector convergence step)
1 parent d5aae20 commit 3fd5bc8

File tree

4 files changed

+715
-250
lines changed

4 files changed

+715
-250
lines changed
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.clustering
19+
20+
import scala.util.Random
21+
22+
/**
23+
* PICLinalg
24+
*
25+
*/
26+
27+
object PICLinalg {
28+
29+
type DVector = Array[Double]
30+
type DMatrix = Array[DVector]
31+
32+
type LabeledVector = (String, DVector)
33+
34+
type IndexedVector = (Long, DVector)
35+
36+
type Vertices = Seq[LabeledVector]
37+
38+
def add(v1: DVector, v2: DVector) =
39+
v1.zip(v2).map { x => x._1 + x._2}
40+
41+
def mult(v1: DVector, d: Double) = {
42+
v1.map {
43+
_ * d
44+
}
45+
}
46+
47+
def mult(v1: DVector, v2: DVector) = {
48+
v1.zip(v2).map { case (v1v, v2v) => v1v * v2v}
49+
}
50+
51+
def multColByRow(v1: DVector, v2: DVector) = {
52+
val mat = for (v1v <- v1)
53+
yield mult(v2, v1v)
54+
// println(s"Col by Row:\n${printMatrix(mat,
55+
// v1.length, v1.length)}")
56+
mat
57+
}
58+
59+
def norm(vect: DVector): Double = {
60+
Math.sqrt(vect.foldLeft(0.0) { case (sum, dval) => sum + Math.pow(dval, 2)})
61+
}
62+
63+
def manhattanNorm(vect: DVector): Double = {
64+
val n = vect.foldLeft(0.0) { case (sum, dval) => sum + Math.abs(dval)}
65+
n / Math.sqrt(vect.size)
66+
}
67+
68+
def dot(v1: DVector, v2: DVector) = {
69+
v1.zip(v2).foldLeft(0.0) {
70+
case (sum, (b, p)) => sum + b * p
71+
}
72+
}
73+
74+
def onesVector(len: Int): DVector = {
75+
Array.fill(len)(1.0)
76+
}
77+
78+
val calcEigenDiffs = true
79+
80+
def withinTol(d: Double, tol: Double = DefaultTolerance) = Math.abs(d) <= tol
81+
82+
val DefaultTolerance: Double = 1e-8
83+
84+
def makeNonZero(dval: Double, tol: Double = DefaultTolerance) = {
85+
if (Math.abs(dval) < tol) {
86+
Math.signum(dval) * tol
87+
} else {
88+
dval
89+
}
90+
}
91+
92+
def transpose(mat: DMatrix) = {
93+
val nCols = mat(0).length
94+
val matT = mat
95+
.flatten
96+
.zipWithIndex
97+
.groupBy {
98+
_._2 % nCols
99+
}
100+
.toSeq.sortBy {
101+
_._1
102+
}
103+
.map(_._2)
104+
// .map(_.toSeq.sortBy(_._1))
105+
.map(_.map(_._1))
106+
.toArray
107+
matT
108+
}
109+
110+
def printMatrix(mat: Array[Array[Double]]): String
111+
= printMatrix(mat, mat.length, mat.length)
112+
113+
def printMatrix(darr: Array[DVector], numRows: Int, numCols: Int): String = {
114+
val flattenedArr = darr.zipWithIndex.foldLeft(new DVector(numRows * numCols)) {
115+
case (flatarr, (row, indx)) =>
116+
System.arraycopy(row, 0, flatarr, indx * numCols, numCols)
117+
flatarr
118+
}
119+
printMatrix(flattenedArr, numRows, numCols)
120+
}
121+
122+
def printMatrix(darr: DVector, numRows: Int, numCols: Int): String = {
123+
val stride = (darr.length / numCols)
124+
val sb = new StringBuilder
125+
def leftJust(s: String, len: Int) = {
126+
" ".substring(0, len - Math.min(len, s.length)) + s
127+
}
128+
129+
for (r <- 0 until numRows) {
130+
for (c <- 0 until numCols) {
131+
sb.append(leftJust(f"${darr(r * stride + c)}%.6f", 9) + " ")
132+
}
133+
sb.append("\n")
134+
}
135+
sb.toString
136+
}
137+
138+
def printVect(dvect: DVector) = {
139+
dvect.mkString(",")
140+
}
141+
142+
def project(basisVector: DVector, inputVect: DVector) = {
143+
val pnorm = makeNonZero(norm(basisVector))
144+
val projectedVect = basisVector.map(
145+
_ * dot(basisVector, inputVect) / dot(basisVector, basisVector))
146+
projectedVect
147+
}
148+
149+
def subtract(v1: DVector, v2: DVector) = {
150+
val subvect = v1.zip(v2).map { case (v1val, v2val) => v1val - v2val}
151+
subvect
152+
}
153+
154+
def subtractProjection(vect: DVector, basisVect: DVector): DVector = {
155+
val proj = project(basisVect, vect)
156+
val subVect = subtract(vect, proj)
157+
subVect
158+
}
159+
160+
def localPIC(matIn: DMatrix, nClusters: Int, nIterations: Int,
161+
optExpected: Option[(DVector, DMatrix)]) = {
162+
163+
var mat = matIn.map(identity)
164+
val numVects = mat.length
165+
166+
val (expLambda, expdat) = optExpected.getOrElse((new DVector(0), new DMatrix(0)))
167+
var cnorm = -1.0
168+
for (k <- 0 until nClusters) {
169+
val r = new Random()
170+
var eigen = Array.fill(numVects) {
171+
// 1.0
172+
r.nextDouble
173+
}
174+
val enorm = norm(eigen)
175+
eigen.map { e => e / enorm}
176+
177+
for (iter <- 0 until nIterations) {
178+
eigen = mat.map { dvect =>
179+
dot(dvect, eigen)
180+
}
181+
cnorm = makeNonZero(norm(eigen))
182+
eigen = eigen.map(_ / cnorm)
183+
}
184+
val signum = Math.signum(dot(mat(0), eigen))
185+
val lambda = dot(mat(0), eigen) / eigen(0)
186+
eigen = eigen.map(_ * signum)
187+
println(s"lambda=$lambda eigen=${printVect(eigen)}")
188+
if (expLambda.length > 0) {
189+
val compareVect = eigen.zip(expdat(k)).map { case (a, b) => a / b}
190+
println(s"Ratio to expected: lambda=${lambda / expLambda(k)} " +
191+
s"Vect=${compareVect.mkString("[", ",", "]")}")
192+
}
193+
if (k < nClusters - 1) {
194+
// TODO: decide between deflate/schurComplement
195+
mat = schurComplement(mat, lambda, eigen)
196+
}
197+
}
198+
}
199+
200+
def compareVectors(v1: Array[Double], v2: Array[Double]) = {
201+
v1.zip(v2).forall { case (v1v, v2v) => withinTol(v1v - v2v)}
202+
}
203+
204+
def compareMatrices(m1: DMatrix, m2: DMatrix) = {
205+
m1.zip(m2).forall { case (m1v, m2v) =>
206+
m1v.zip(m2v).forall { case (m1vv, m2vv) => withinTol(m1vv - m2vv)}
207+
}
208+
}
209+
210+
def subtract(mat1: DMatrix, mat2: DMatrix) = {
211+
mat1.zip(mat2).map { case (m1row, m2row) =>
212+
m1row.zip(m2row).map { case (m1v, m2v) => m1v - m2v}
213+
}
214+
}
215+
216+
def deflate(mat: DMatrix, lambda: Double, eigen: DVector) = {
217+
// mat = mat.map(subtractProjection(_, mult(eigen, lambda)))
218+
val eigT = eigen
219+
val projected = multColByRow(eigen, eigT).map(mult(_, lambda))
220+
// println(s"projected matrix:\n${printMatrix(projected,
221+
// eigen.length, eigen.length)}")
222+
val matOut = mat.zip(projected).map { case (mrow, prow) =>
223+
subtract(mrow, prow)
224+
}
225+
println(s"Updated matrix:\n${
226+
printMatrix(mat,
227+
eigen.length, eigen.length)
228+
}")
229+
matOut
230+
}
231+
232+
def mult(mat1: DMatrix, mat2: DMatrix) = {
233+
val mat2T = transpose(mat2)
234+
val outmatT = for {row <- mat1}
235+
yield {
236+
val outRow = mat2T.map { col =>
237+
dot(row, col)
238+
}
239+
outRow
240+
}
241+
outmatT
242+
}
243+
244+
// def mult(mat: DMatrix, vect: DVector): DMatrix = {
245+
// val outMat = mat.map { m =>
246+
// mult(m, vect)
247+
// }
248+
// outMat
249+
// }
250+
//
251+
// def mult(vect: DVector, mat: DMatrix): DMatrix = {
252+
// for {d <- vect.zip(transpose(mat)) }
253+
// yield mult(d._2, d._1)
254+
// }
255+
256+
def scale(mat: DMatrix, d: Double): DMatrix = {
257+
for (row <- mat) yield mult(row, d)
258+
}
259+
260+
def transpose(vector: DVector) = {
261+
vector.map { d => Array(d)}
262+
}
263+
264+
def toMat(dvect: Array[Double], ncols: Int) = {
265+
val m = dvect.toSeq.grouped(ncols).map(_.toArray).toArray
266+
m
267+
}
268+
269+
def schurComplement(mat: DMatrix, lambda: Double, eigen: DVector) = {
270+
val eigT = toMat(eigen, eigen.length) // The sense is reversed
271+
val eig = transpose(eigT)
272+
val projected = mult(eig, eigT)
273+
println(s"projected matrix:\n${
274+
printMatrix(projected,
275+
eigen.length, eigen.length)
276+
}")
277+
val numerat1 = mult(mat, projected)
278+
val numerat2 = mult(numerat1, mat)
279+
println(s"numerat2=\n${
280+
printMatrix(numerat2,
281+
eigen.length, eigen.length)
282+
}")
283+
val denom1 = mult(eigT, mat)
284+
val denom2 = mult(denom1, toMat(eigen, 1))
285+
val denom = denom2(0)(0)
286+
println(s"denom is $denom")
287+
val projMat = scale(numerat2, 1.0 / denom)
288+
println(s"Updated matrix:\n${
289+
printMatrix(projMat,
290+
eigen.length, eigen.length)
291+
}")
292+
val defMat = subtract(mat, projMat)
293+
println(s"deflated matrix:\n${
294+
printMatrix(defMat,
295+
eigen.length, eigen.length)
296+
}")
297+
defMat
298+
}
299+
300+
}
301+

0 commit comments

Comments
 (0)