Skip to content

Commit 69e1f37

Browse files
committed
remove lazy eval, and minor memory footprint
1 parent 548e9de commit 69e1f37

File tree

1 file changed

+42
-28
lines changed

1 file changed

+42
-28
lines changed

mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -53,35 +53,46 @@ private class VectorRDDStatisticsAggregator(
5353
val currMin: BDV[Double]) extends VectorRDDStatisticalSummary with Serializable {
5454

5555
// lazy val is used for computing only once time. Same below.
56-
override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
56+
override def mean = {
57+
val realMean = BDV.zeros[Double](currMean.length)
58+
var i = 0
59+
while (i < currMean.length) {
60+
realMean(i) = currMean(i) * nnz(i) / totalCnt
61+
i += 1
62+
}
63+
Vectors.fromBreeze(realMean)
64+
}
5765

58-
override lazy val variance = {
66+
override def variance = {
67+
val realVariance = BDV.zeros[Double](currM2n.length)
5968
val deltaMean = currMean
6069
var i = 0
6170
while (i < currM2n.size) {
62-
currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt
63-
currM2n(i) /= totalCnt
71+
realVariance(i) = currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt
72+
realVariance(i) /= totalCnt
6473
i += 1
6574
}
66-
Vectors.fromBreeze(currM2n)
75+
Vectors.fromBreeze(realVariance)
6776
}
6877

69-
override lazy val count: Long = totalCnt.toLong
78+
override def count: Long = totalCnt.toLong
7079

71-
override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)
80+
override def numNonZeros: Vector = Vectors.fromBreeze(nnz)
7281

73-
override lazy val max: Vector = {
74-
nnz.iterator.foreach {
75-
case (id, count) =>
76-
if ((count < totalCnt) && (currMax(id) < 0.0)) currMax(id) = 0.0
82+
override def max: Vector = {
83+
var i = 0
84+
while (i < nnz.length) {
85+
if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0
86+
i += 1
7787
}
7888
Vectors.fromBreeze(currMax)
7989
}
8090

81-
override lazy val min: Vector = {
82-
nnz.iterator.foreach {
83-
case (id, count) =>
84-
if ((count < totalCnt) && (currMin(id) > 0.0)) currMin(id) = 0.0
91+
override def min: Vector = {
92+
var i = 0
93+
while (i < nnz.length) {
94+
if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0
95+
i += 1
8596
}
8697
Vectors.fromBreeze(currMin)
8798
}
@@ -117,15 +128,16 @@ private class VectorRDDStatisticsAggregator(
117128

118129
val deltaMean = currMean - other.currMean
119130

120-
other.currMean.activeIterator.foreach {
121-
case (id, 0.0) =>
122-
case (id, value) =>
123-
currMean(id) =
124-
(currMean(id) * nnz(id) + other.currMean(id) * other.nnz(id)) / (nnz(id) + other.nnz(id))
131+
var i = 0
132+
while (i < other.currMean.length) {
133+
if (other.currMean(i) != 0.0)
134+
currMean(i) = (currMean(i) * nnz(i) + other.currMean(i) * other.nnz(i)) /
135+
(nnz(i) + other.nnz(i))
136+
i += 1
125137
}
126138

127-
var i = 0
128-
while(i < currM2n.size) {
139+
i = 0
140+
while (i < currM2n.size) {
129141
(nnz(i), other.nnz(i)) match {
130142
case (0.0, 0.0) =>
131143
case _ => currM2n(i) +=
@@ -134,14 +146,16 @@ private class VectorRDDStatisticsAggregator(
134146
i += 1
135147
}
136148

137-
other.currMax.activeIterator.foreach {
138-
case (id, value) =>
139-
if (currMax(id) < value) currMax(id) = value
149+
i = 0
150+
while (i < other.currMax.length) {
151+
if (currMax(i) < other.currMax(i)) currMax(i) = other.currMax(i)
152+
i += 1
140153
}
141154

142-
other.currMin.activeIterator.foreach {
143-
case (id, value) =>
144-
if (currMin(id) > value) currMin(id) = value
155+
i = 0
156+
while (i < other.currMin.length) {
157+
if (currMin(i) > other.currMin(i)) currMin(i) = other.currMin(i)
158+
i += 1
145159
}
146160

147161
nnz += other.nnz

0 commit comments

Comments
 (0)