Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.linalg

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
import com.github.fommil.netlib.ARPACK
import org.netlib.util.{intW, doubleW}

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Represents eigenvalue decomposition factors.
*/
@Experimental
case class EigenValueDecomposition[VType](s: Vector, V: VType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this in the first pass. This class is not used any where. Please remove it. Also, it is hard to maintain binary compatibility if we have both a case class and a companion object.


object EigenValueDecomposition {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used internally. So please mark it private[mllib].

/**
* Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK.
* The caller needs to ensure that the input matrix is real symmetric. This function requires
* memory for `n*(4*k+4)` doubles.
*
* @param mul a function that multiplies the symmetric matrix with a Vector.
* @param n dimension of the square matrix (maximum Int.MaxValue).
* @param k number of leading eigenvalues required.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add 0 < k < n.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to comment

* @param tol tolerance of the eigs computation.
* @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors
* (columns of the matrix). The number of computed eigenvalues might be smaller than k.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain when the number of computed eigenvalues is smaller than k.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add explanation in comment

*/
private[mllib] def symmetricEigs(mul: Vector => Vector, n: Int, k: Int, tol: Double)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it implemented in breeze-0.8? If so, please add a TODO here so we will switch to the breeze implementation if we upgrade breeze.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current implementation in breeze-0.8 is for svd, it computes both U and V in memory. I can refactor that part to move the code block calling ARPACK to eig so only V is computed. TODO added

: (BDV[Double], BDM[Double]) = {
require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n")

val arpack = ARPACK.getInstance()

val tolW = new doubleW(tol)
val nev = new intW(k)
val ncv = scala.math.min(2 * k, n)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

math.min should be sufficient.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


val bmat = "I"
val which = "LM"

var iparam = new Array[Int](11)
iparam(0) = 1
iparam(2) = 300
iparam(6) = 1

var ido = new intW(0)
var info = new intW(0)
var resid:Array[Double] = new Array[Double](n)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove :Array[Double]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

var v = new Array[Double](n * ncv)
var workd = new Array[Double](n * 3)
var workl = new Array[Double](ncv * (ncv + 8))
var ipntr = new Array[Int](11)

// first call to ARPACK
arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd,
workl, workl.length, info)

val w = BDV(workd)

while(ido.`val` != 99) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be a space between while and (
IntelliJ handles all these - probably wise to run your code through it.
Checkout the Spark style guide https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please comment what these codes mean and also give some indication what they mean in the return exception

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space added between while and (

if (ido.`val` != -1 && ido.`val` != 1) {
throw new IllegalStateException("ARPACK returns ido = " + ido.`val`)
}
// multiply working vector with the matrix
val inputOffset = ipntr(0) - 1
val outputOffset = ipntr(1) - 1
val x = w(inputOffset until inputOffset + n)
val y = w(outputOffset until outputOffset + n)
y := BDV(mul(Vectors.fromBreeze(x)).toArray)
// call ARPACK
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please comment the calls to ARPACK with a one-liner of what they do - arpack method names are opaque and Spark code shouldn't suffer too much because of it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

improved the comments for ARPACK calls. This part should be moved into breeze's eig object. Here is a temporary solution before switching to the new version of breeze.

arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr,
workd, workl, workl.length, info)
}

if (info.`val` != 0) {
throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val`)
}

val d = new Array[Double](nev.`val`)
val select = new Array[Boolean](ncv)
val z = java.util.Arrays.copyOfRange(v, 0, nev.`val` * n)

arpack.dseupd(true, "A", select, d, z, n, 0.0, bmat, n, which, nev, tol, resid, ncv, v, n,
iparam, ipntr, workd, workl, workl.length, info)

val computed = iparam(4)

val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map { (space)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

r => (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move r => up

}

val sortedEigenPairs = eigenPairs.sortBy(-1 * _._1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: - _._1 may look faster ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace multiplication with negation


// copy eigenvectors in descending order of eigenvalues
val sortedU = BDM.zeros[Double](n, computed)
sortedEigenPairs.zipWithIndex.map{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use foreach instead of map. If this is an iterator, map doesn't act.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use foreach

r => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this line up

for (i <- 0 until n) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use while instead of for

sortedU.data(r._2 * n + i) = r._1._2(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r._2 * n could be cached.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created a val for r._2 * n

}
}
}

(BDV(sortedEigenPairs.map(_._1)), sortedU)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,19 @@ class RowMatrix(
nRows
}

/**
* Multiply the Gramian matrix `A^T A` by a Vector on the right.
*
* @param v a local vector whose length must match the number of columns of this matrix
* @return a local DenseVector representing the product
*/
private[mllib] def multiplyGramianMatrix(v: Vector): Vector = {
val bv = rows.map{
row => row.toBreeze * row.toBreeze.dot(v.toBreeze)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would create many temp objects. Please use RDD#aggregate and axpy. The initial vector should be a dense vector.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the code in computeGramianMatrix as an example.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to use aggregrate and axpy (from breeze). There are still some boxing/unboxing overhead from/to breeze objects. I can further improve if needed.

}.reduce( (x: BV[Double], y: BV[Double]) => x + y )
Vectors.fromBreeze(bv)
}

/**
* Computes the Gramian matrix `A^T A`.
*/
Expand All @@ -221,15 +234,20 @@ class RowMatrix(

/**
* Computes the singular value decomposition of this matrix.
* Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
* Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V',
* where S contains the leading singular values, U and V contain the corresponding singular
* vectors.
*
* There is no restriction on m, but we require `n^2` doubles to fit in memory.
* Further, n should be less than m.

* The decomposition is computed by first computing A'A = V S^2 V',
* computing svd locally on that (since n x n is small), from which we recover S and V.
* Then we compute U via easy matrix multiplication as U = A * (V * S^-1).
* Note that this approach requires `O(n^3)` time on the master node.
* There is no restriction on m, but we require `n*(6*k+4)` doubles to fit in memory on the master
* node. Further, n should be less than m.
*
* The decomposition is computed by providing a function that multiples a vector with A'A to
* ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V.
* Then we compute U via easy matrix multiplication as U = A * (V * S-1).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"S-1" -> "S^-1" or "S^{-1}"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed. Somehow my intellij treated ^ as a tag and matched it with a ^ in another line. They both got deleted when I was editing the other line...

* Note that this approach requires `O(nnz(A))` time.
*
* When the requested eigenvalues k = n, a non-sparse implementation will be used, which requires
* `n^2` doubles to fit in memory and `O(n^3)` time on the master node.
*
* At most k largest non-zero singular values and associated vectors are returned.
* If there are k such values, then the dimensions of the return will be:
Expand All @@ -243,20 +261,27 @@ class RowMatrix(
* @param computeU whether to compute U
* @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
* are treated as zero, where sigma(0) is the largest singular value.
* @param tol the tolerance of the svd computation.
* @return SingularValueDecomposition(U, s, V)
*/
def computeSVD(
k: Int,
computeU: Boolean = false,
rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
rCond: Double = 1e-9,
tol: Double = 1e-6): SingularValueDecomposition[RowMatrix, Matrix] = {
val n = numCols().toInt
require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")

val G = computeGramianMatrix()

// TODO: Use sparse SVD instead.
val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) =
brzSvd(G.toBreeze.asInstanceOf[BDM[Double]])
val (sigmaSquares: BDV[Double], u: BDM[Double]) = if (k < n) {
EigenValueDecomposition.symmetricEigs(multiplyGramianMatrix, n, k, tol)
} else {
logWarning(s"Request full SVD (k = n = $k), while ARPACK requires k strictly less than n. " +
s"Using non-sparse implementation.")
val G = computeGramianMatrix()
val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) =
brzSvd(G.toBreeze.asInstanceOf[BDM[Double]])
(sigmaSquaresFull, uFull)
}
val sigmas: BDV[Double] = brzSqrt(sigmaSquares)

// Determine effective rank.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,19 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
}
val svdWithoutU = mat.computeSVD(n)
val svdWithoutU = mat.computeSVD(n - 1)
assert(svdWithoutU.U === null)
}
}

test("svd of a low-rank matrix") {
val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2)
val mat = new RowMatrix(rows, 4, 2)
val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2)
val mat = new RowMatrix(rows, 4, 3)
val svd = mat.computeSVD(2, computeU = true)
assert(svd.s.size === 1, "should not return zero singular values")
assert(svd.U.numRows() === 4)
assert(svd.U.numCols() === 1)
assert(svd.V.numRows === 2)
assert(svd.V.numRows === 3)
assert(svd.V.numCols === 1)
}

Expand Down