Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,24 @@ private[feature] trait PCAParams extends Params with HasInputCol with HasOutputC

/**
* The number of principal components.
*
* @group param
*/
final val k: IntParam = new IntParam(this, "k", "the number of principal components")

/** @group getParam */
def getK: Int = $(k)

/**
* Minimal variance retained by principal components.
*
* @group param
*/
final val requiredVariance: DoubleParam = new DoubleParam(this, "requiredVariance",
"minimal variance retained by principal components")

/** @group getParam */
def getRequiredVariance: Double = $(requiredVariance)
}

/**
Expand All @@ -63,7 +74,16 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams
def setOutputCol(value: String): this.type = set(outputCol, value)

/** @group setParam */
def setK(value: Int): this.type = set(k, value)
def setK(value: Int): this.type = {
if (isSet(requiredVariance)) clear(requiredVariance)
set(k, value)
}

/** @group setParam */
def setRequiredVariance(value: Double): this.type = {
if (isSet(k)) clear(k)
set(requiredVariance, value)
}

/**
* Computes a [[PCAModel]] that contains the principal components of the input vectors.
Expand All @@ -72,7 +92,11 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams
override def fit(dataset: Dataset[_]): PCAModel = {
transformSchema(dataset.schema, logging = true)
val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v}
val pca = new feature.PCA(k = $(k))
val pca = if (isSet(k)) {
new feature.PCA(k = $(k))
} else {
new feature.PCA(requiredVariance = $(requiredVariance))
}
val pcaModel = pca.fit(input)
copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this))
}
Expand Down
17 changes: 13 additions & 4 deletions mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.mllib.feature

import java.util.Arrays

import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg._
Expand All @@ -31,7 +33,14 @@ import org.apache.spark.rdd.RDD
@Since("1.4.0")
class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
require(k > 0,
s"Number of principal components must be positive but got ${k}")
s"Number of principal components must be positive but got $k")

var pcFilter: Either[Int, Double] = Left(k)

def this(requiredVariance: Double) = {
this(k = 1)
pcFilter = Right(requiredVariance)
}

/**
* Computes a [[PCAModel]] that contains the principal components of the input vectors.
Expand All @@ -44,7 +53,7 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
s"source vector size is ${sources.first().size} must be greater than k=$k")

val mat = new RowMatrix(sources)
val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k)
val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(pcFilter)
val densePC = pc match {
case dm: DenseMatrix =>
dm
Expand All @@ -66,7 +75,7 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) {
case sv: SparseVector =>
sv.toDense
}
new PCAModel(k, densePC, denseExplainedVariance)
new PCAModel(explainedVariance.size, densePC, denseExplainedVariance)
}

/**
Expand Down Expand Up @@ -109,4 +118,4 @@ class PCAModel private[spark] (
s"SparseVector or DenseVector. Instead got: ${vector.getClass}")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ class RowMatrix @Since("1.0.0") (
*
* @note The conditions that decide which method to use internally and the default parameters are
* subject to change.
*
* @param k number of leading singular values to keep (0 < k <= n).
* @param k number of leading singular values to keep (0 < k <= n).
* It might return less than k if
* there are numerically zero singular values or there are not enough Ritz values
* converged before the maximum number of Arnoldi update iterations is reached (in case
Expand Down Expand Up @@ -321,7 +320,8 @@ class RowMatrix @Since("1.0.0") (
/**
* Computes the covariance matrix, treating each row as an observation. Note that this cannot
* be computed on matrices with more than 65535 columns.
* @return a local dense matrix of size n x n
*
* @return a local dense matrix of size n x n
*/
@Since("1.0.0")
def computeCovariance(): Matrix = {
Expand Down Expand Up @@ -379,15 +379,21 @@ class RowMatrix @Since("1.0.0") (
*
* Note that this cannot be computed on matrices with more than 65535 columns.
*
* @param k number of top principal components.
* @param filter either the number of top principal components or variance
* retained by the minimal set of principal components.
* @return a matrix of size n-by-k, whose columns are principal components, and
* a vector of values which indicate how much variance each principal component
* explains
*/
@Since("1.6.0")
def computePrincipalComponentsAndExplainedVariance(k: Int): (Matrix, Vector) = {
def computePrincipalComponentsAndExplainedVariance(filter: Either[Int, Double])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm no expert in the ML domain, but from a user perspective, this breaks API backwards compatibility.
An alternative could be to create a new method and factor out common behaviour shared with the current computePrincipalComponentsAndExplainedVariance into a private utility method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sethah @jodersky It looks like the comment Since("1.6.0") is false becaue this method is not available in spark 1.6 - this change was merged to master instead of 1.6 branch. Do you still consider this change as API breaking given that it modifies API that wasn't yet released? If yes then I'll do as @jodersky said and introduce a new method and move common code to a new private one. I'd really like to have this feature in MLlib version because I use it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the breakage, nevertheless I would recommend implementing a new method regardless. I find the method's parameter type Either[Int, Double] to be quite confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah having one method to mean two things using an Either is too strange. At least, you would provide two overloads. And then, no reason to overload versus given them distinct and descriptive names.

I don't understand the question about unreleased APIs -- 1.6.0 was released a while ago and this method takes an Int parameter there. We certainly want to keep the ability to set a fixed number of principal components.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is RowMatrix as in 1.6.1 release: https://github.com/apache/spark/blob/15de51c238a7340fa81cb0b80d029a05d97bfc5c/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala am I correct? If yes then can you find there a method named computePrincipalComponentsAndExplainedVariance? I can't, yet on master it is annotated with Since("1.6.0") - isn't it false?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha you're right, it wasn't in 1.6. This is my fault: 21b3d2a
It never was added to branch 1.6, despite the apparent intention. At this point I think it should be considered 2.0+ and you can fix that annotation here. So yeah this method was never 'released'. Still I think we want to do something different with the argument anyway.

: (Matrix, Vector) = {
val n = numCols().toInt
require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]")
filter match {
case Left(k) => require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]")
case Right(requiredVariance) => require(requiredVariance > 0.0 && requiredVariance <= 1.0,
s"requiredVariance = $requiredVariance out of range (0, 1.0]")
}

val Cov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]]

Expand All @@ -396,6 +402,17 @@ class RowMatrix @Since("1.0.0") (
val eigenSum = s.data.sum
val explainedVariance = s.data.map(_ / eigenSum)

val k = filter match {
case Left(k) => k
case Right(requiredVariance) =>
val minFeatures = explainedVariance
.scanLeft(0.0)(_ + _)
.indexWhere(_ >= requiredVariance)
require(minFeatures > 0 && minFeatures <= n, s"minFeatures computed using " +
s"requiredVariance was $minFeatures and was out of range (0, n = $n]")
minFeatures
}

if (k == n) {
(Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance))
} else {
Expand All @@ -413,7 +430,7 @@ class RowMatrix @Since("1.0.0") (
*/
@Since("1.0.0")
def computePrincipalComponents(k: Int): Matrix = {
computePrincipalComponentsAndExplainedVariance(k)._1
computePrincipalComponentsAndExplainedVariance(Left(k))._1
}

/**
Expand Down
36 changes: 28 additions & 8 deletions mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ import org.apache.spark.sql.Row

class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {

val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
lazy val dataRDD = sc.parallelize(data, 2)

test("params") {
ParamsSuite.checkParams(new PCA)
val mat = Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix]
Expand All @@ -37,14 +44,6 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
}

test("pca") {
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)

val dataRDD = sc.parallelize(data, 2)

val mat = new RowMatrix(dataRDD)
val pc = mat.computePrincipalComponents(3)
val expected = mat.multiply(pc).rows
Expand Down Expand Up @@ -81,4 +80,25 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
val newInstance = testDefaultReadWrite(instance)
assert(newInstance.pc === instance.pc)
}

test("should return model with minimal number of features that retain given level of variance") {
// given
val df = sqlContext.createDataFrame(dataRDD.zipWithIndex()).toDF("features", "index")

// when
val trimmed = new PCA()
.setInputCol("features")
.setOutputCol("pca_features")
.setRequiredVariance(0.9)
.fit(df)

// then
val pcaWithExpectedK = new PCA()
.setInputCol("features")
.setOutputCol("pca_features")
.setK(2)
.fit(df)
assert(trimmed.explainedVariance === pcaWithExpectedK.explainedVariance)
assert(trimmed.pc === pcaWithExpectedK.pc)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,23 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext {
val pca = new PCA(k).fit(dataRDD)

val mat = new RowMatrix(dataRDD)
val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k)
val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(Left(k))

val pca_transform = pca.transform(dataRDD).collect()
val mat_multiply = mat.multiply(pc).rows.collect()

assert(pca_transform.toSet === mat_multiply.toSet)
assert(pca.explainedVariance === explainedVariance)
}

test("should return model with minimal number of features that retain given level of variance") {
// when
val trimmed = new PCA(requiredVariance = 0.90).fit(dataRDD)

// then
val pcaWithExpectedK = new PCA(k = 2).fit(dataRDD)
assert(trimmed.k === pcaWithExpectedK.k)
assert(trimmed.explainedVariance === pcaWithExpectedK.explainedVariance)
assert(trimmed.pc === pcaWithExpectedK.pc)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {

test("pca") {
for (mat <- Seq(denseMat, sparseMat); k <- 1 to n) {
val (pc, expVariance) = mat.computePrincipalComponentsAndExplainedVariance(k)
val (pc, expVariance) = mat.computePrincipalComponentsAndExplainedVariance(Left(k))
assert(pc.numRows === n)
assert(pc.numCols === k)
assertColumnEqualUpToSign(pc.toBreeze.asInstanceOf[BDM[Double]], principalComponents, k)
Expand Down