Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV,

import org.apache.spark.annotation.Experimental
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PartitionwiseSampledRDD
import org.apache.spark.util.random.BernoulliSampler
Expand Down Expand Up @@ -317,4 +318,25 @@ object MLUtils {
}
sqDist
}

/**
* Compute the mean and standard deviations of each vector component and normalize each vector in the RDD.
* The result is an RDD of Vectors where each column has a mean of zero and standard deviation of one.
*/
def normalizeByCol(rdd: RDD[Vector]): RDD[Vector] = {
val plus: (Array[Double], Array[Double]) => Array[Double] = _.zip(_).map{ x => x._1 + x._2 }
val meansAndSdevs = rdd
.map{ v => (1, (1L, v.toArray, v.toArray.map{ x => x * x })) }
.reduceByKey{ case ((count1, sums1, sqSums1), (count2, sums2, sqSums2)) => (count1 + count2, plus(sums1, sums2), plus(sqSums1, sqSums2)) }
.map{ case(_, x) => x } // contains (count, sums, sqSums)
.map{ case(count, sum, sqSum) => (sum.map(_ / count), sqSum.map(_ / count), 1.0 * count / (count - 1)) } // contains (means, sqMeans, fSample)
.map{ case(means, sqMeans, fSample) => (means, sqMeans.zip(means).map{ case (sqMean, mean) => math.pow(fSample * (sqMean - mean * mean), 0.5) }) }
rdd
.cartesian(meansAndSdevs)
.map{ case(v, (means, sdevs)) => v.toArray.zip(means zip sdevs).map{ case (value, (mean, sdev)) => (value - mean) / sdev } }
.map(Vectors.dense(_))
}

def normalizeByCol(rdd: RDD[LabeledPoint])(implicit d: DummyImplicit): RDD[LabeledPoint] =
normalizeByCol(rdd.map(_.features)).zip(rdd).map{ case (newVector, oldLp) => LabeledPoint(oldLp.label, newVector) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.File
import scala.io.Source
import scala.math

import org.scalatest.FunSuite
import org.scalatest.{ FunSuite, Matchers }

import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNorm,
squaredDistance => breezeSquaredDistance}
Expand All @@ -34,7 +34,7 @@ import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils._
import org.apache.spark.util.Utils

class MLUtilsSuite extends FunSuite with LocalSparkContext {
class MLUtilsSuite extends FunSuite with Matchers with LocalSparkContext {

test("epsilon computation") {
assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
Expand Down Expand Up @@ -189,4 +189,17 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
assert(points.collect().toSet === loaded.collect().toSet)
Utils.deleteRecursively(tempDir)
}

test("normalizeByCol") {
val arrays = sc.parallelize(List(Array(1.0, 2.0, 3.0), Array(1.5, 2.0, 4.0), Array(2.0, 2.0, 5.0)))
val vectors = arrays.map(Vectors.dense(_))
val normalized = normalizeByCol(vectors)
val results = normalized.map(_.toArray).collect
results(0)(0) should be(-1.0 +- 0.01)
results(1)(0) should be(0.0 +- 0.01)
results(2)(0) should be(1.0 +- 0.01)
assert(results(0)(1).isNaN)
assert(results(1)(1).isNaN)
assert(results(2)(1).isNaN)
}
}