diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index dc10a194783e..5acd4d128514 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -24,6 +24,7 @@ import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV, import org.apache.spark.annotation.Experimental import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.PartitionwiseSampledRDD import org.apache.spark.util.random.BernoulliSampler @@ -317,4 +318,25 @@ object MLUtils { } sqDist } + + /** + * Compute the mean and standard deviations of each vector component and normalize each vector in the RDD. + * The result is an RDD of Vectors where each column has a mean of zero and standard deviation of one. + */ + def normalizeByCol(rdd: RDD[Vector]): RDD[Vector] = { + val plus: (Array[Double], Array[Double]) => Array[Double] = _.zip(_).map{ x => x._1 + x._2 } + val meansAndSdevs = rdd + .map{ v => (1, (1L, v.toArray, v.toArray.map{ x => x * x })) } + .reduceByKey{ case ((count1, sums1, sqSums1), (count2, sums2, sqSums2)) => (count1 + count2, plus(sums1, sums2), plus(sqSums1, sqSums2)) } + .map{ case(_, x) => x } // contains (count, sums, sqSums) + .map{ case(count, sum, sqSum) => (sum.map(_ / count), sqSum.map(_ / count), 1.0 * count / (count - 1)) } // contains (means, sqMeans, fSample) + .map{ case(means, sqMeans, fSample) => (means, sqMeans.zip(means).map{ case (sqMean, mean) => math.pow(fSample * (sqMean - mean * mean), 0.5) }) } + rdd + .cartesian(meansAndSdevs) + .map{ case(v, (means, sdevs)) => v.toArray.zip(means zip sdevs).map{ case (value, (mean, sdev)) => (value - mean) / sdev } } + .map(Vectors.dense(_)) + } + + def normalizeByCol(rdd: RDD[LabeledPoint])(implicit d: DummyImplicit): RDD[LabeledPoint] = + normalizeByCol(rdd.map(_.features)).zip(rdd).map{ case (newVector, oldLp) => LabeledPoint(oldLp.label, newVector) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 8ef2bb1bf6a7..0571e2ad54b2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -22,7 +22,7 @@ import java.io.File import scala.io.Source import scala.math -import org.scalatest.FunSuite +import org.scalatest.{ FunSuite, Matchers } import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNorm, squaredDistance => breezeSquaredDistance} @@ -34,7 +34,7 @@ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils._ import org.apache.spark.util.Utils -class MLUtilsSuite extends FunSuite with LocalSparkContext { +class MLUtilsSuite extends FunSuite with Matchers with LocalSparkContext { test("epsilon computation") { assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.") @@ -189,4 +189,17 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext { assert(points.collect().toSet === loaded.collect().toSet) Utils.deleteRecursively(tempDir) } + + test("normalizeByCol") { + val arrays = sc.parallelize(List(Array(1.0, 2.0, 3.0), Array(1.5, 2.0, 4.0), Array(2.0, 2.0, 5.0))) + val vectors = arrays.map(Vectors.dense(_)) + val normalized = normalizeByCol(vectors) + val results = normalized.map(_.toArray).collect + results(0)(0) should be(-1.0 +- 0.01) + results(1)(0) should be(0.0 +- 0.01) + results(2)(0) should be(1.0 +- 0.01) + assert(results(0)(1).isNaN) + assert(results(1)(1).isNaN) + assert(results(2)(1).isNaN) + } }