Skip to content

Commit 28cf060

Browse files
committed
fix error of column means
1 parent 54b19ab commit 28cf060

File tree

2 files changed

+31
-39
lines changed

2 files changed

+31
-39
lines changed

mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -43,43 +43,42 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
4343
.map{ x => math.sqrt(x.toArray.map(x => x*x).sum / x.size) }
4444
}
4545

46-
def colMeansOption(): Vector = {
47-
???
46+
def colMeans(): Vector = colMeans(self.take(1).head.size)
47+
48+
def colMeans(size: Int): Vector = {
49+
Vectors.fromBreeze(self.map(_.toBreeze).aggregate((BV.zeros[Double](size), 0.0))(
50+
seqOp = (c, v) => (c, v) match {
51+
case ((prev, cnt), current) =>
52+
(((prev :* cnt) + current) :/ (cnt + 1.0), cnt + 1.0)
53+
},
54+
combOp = (lhs, rhs) => (lhs, rhs) match {
55+
case ((lhsVec, lhsCnt), (rhsVec, rhsCnt)) =>
56+
((lhsVec :* lhsCnt) + (rhsVec :* rhsCnt) :/ (lhsCnt + rhsCnt), lhsCnt + rhsCnt)
57+
}
58+
)._1)
4859
}
4960

50-
def colNorm2Option(): Vector = {
51-
???
52-
}
53-
54-
def colSDsOption(): Vector = {
55-
???
56-
}
57-
58-
def colMeans(): Vector = {
59-
Vectors.fromBreeze(self.map(_.toBreeze).zipWithIndex().fold((BV.zeros(1), 0L)) {
60-
case ((lhsVec, lhsCnt), (rhsVec, rhsCnt)) =>
61-
val totalNow: BV[Double] = lhsVec :* lhsCnt.asInstanceOf[Double]
62-
val totalNew: BV[Double] = (totalNow + rhsVec) :/ rhsCnt.asInstanceOf[Double]
63-
(totalNew, rhsCnt)
64-
}._1)
65-
}
61+
def colNorm2(): Vector = colNorm2(self.take(1).head.size)
6662

67-
def colNorm2(): Vector = Vectors.fromBreeze(
68-
breezeVector = self.map(_.toBreeze).fold(BV.zeros(1)) {
69-
case (lhs, rhs) => lhs + rhs :* rhs
63+
def colNorm2(size: Int): Vector = Vectors.fromBreeze(self.map(_.toBreeze).fold(BV.zeros[Double](size)) {
64+
case (lhs, rhs) =>
65+
lhs + (rhs :* rhs)
7066
}.map(math.sqrt))
7167

72-
def colSDs(): Vector = {
68+
def colSDs(): Vector = colSDs(self.take(1).head.size)
69+
70+
def colSDs(size: Int): Vector = {
7371
val means = this.colMeans()
74-
Vectors.fromBreeze(
75-
breezeVector = self.map(x => x.toBreeze - means.toBreeze)
76-
.zipWithIndex()
77-
.fold((BV.zeros(1), 0L)) {
78-
case ((lhsVec, lhsCnt), (rhsVec, rhsCnt)) =>
79-
val totalNow: BV[Double] = lhsVec :* lhsCnt.asInstanceOf[Double]
80-
val totalNew: BV[Double] = (totalNow + rhsVec :* rhsVec) :/ rhsCnt.asInstanceOf[Double]
81-
(totalNew, rhsCnt)
82-
}._1.map(math.sqrt))
72+
Vectors.fromBreeze(self.map(x => x.toBreeze - means.toBreeze).aggregate((BV.zeros[Double](size), 0.0))(
73+
seqOp = (c, v) => (c, v) match {
74+
case ((prev, cnt), current) =>
75+
(((prev :* cnt) + current) :/ (cnt + 1.0), cnt + 1.0)
76+
},
77+
combOp = (lhs, rhs) => (lhs, rhs) match {
78+
case ((lhsVec, lhsCnt), (rhsVec, rhsCnt)) =>
79+
((lhsVec :* lhsCnt) + (rhsVec :* rhsCnt) :/ (lhsCnt + rhsCnt), lhsCnt + rhsCnt)
80+
}
81+
)._1.map(math.sqrt))
8382
}
8483

8584
private def maxMinOption(cmp: (Vector, Vector) => Boolean): Option[Vector] = {
@@ -99,12 +98,4 @@ class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
9998
val means = self.colMeans()
10099
self.map( v => Vectors.dense(v.toArray.zip(means.toArray).filter{ case (x, m) => m != 0.0 }.map(_._1)))
101100
}
102-
103-
def colShrinkWithFilter(): (RDD[Vector], RDD[Boolean]) = {
104-
???
105-
}
106-
107-
def rowShrinkWithFilter(): (RDD[Vector], RDD[Boolean]) = {
108-
???
109-
}
110101
}

mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
7171

7272
test("colSDs") {
7373
val data = sc.parallelize(localData)
74+
val test = data.colSDs()
7475
assert(equivVector(data.colSDs(), Vectors.dense(colSDs)), "Column SDs do not match.")
7576
}
7677

0 commit comments

Comments
 (0)