Skip to content

Commit 9e8a9d7

Browse files
Anthony Truchetsrowen
authored andcommitted
[SPARK-18471][MLLIB] In LBFGS, avoid sending huge vectors of 0
## What changes were proposed in this pull request? CostFun used to send a dense vector of zeroes as a closure in a treeAggregate call. To avoid that, we replace treeAggregate by mapPartition + treeReduce, creating a zero vector inside the mapPartition block in-place. ## How was this patch tested? Unit test for module mllib run locally for correctness. As for performance we run an heavy optimization on our production data (50 iterations on 128 MB weight vectors) and have seen significant decrease in terms both of runtime and container being killed by lack of off-heap memory. Author: Anthony Truchet <[email protected]> Author: sethah <[email protected]> Author: Anthony Truchet <[email protected]> Closes #16037 from AnthonyTruchet/ENG-17719-lbfgs-only.
1 parent e57e393 commit 9e8a9d7

File tree

2 files changed

+37
-10
lines changed

2 files changed

+37
-10
lines changed

mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -241,16 +241,24 @@ object LBFGS extends Logging {
241241
val bcW = data.context.broadcast(w)
242242
val localGradient = gradient
243243

244-
val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))(
245-
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
246-
val l = localGradient.compute(
247-
features, label, bcW.value, grad)
248-
(grad, loss + l)
249-
},
250-
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
251-
axpy(1.0, grad2, grad1)
252-
(grad1, loss1 + loss2)
253-
})
244+
val seqOp = (c: (Vector, Double), v: (Double, Vector)) =>
245+
(c, v) match {
246+
case ((grad, loss), (label, features)) =>
247+
val denseGrad = grad.toDense
248+
val l = localGradient.compute(features, label, bcW.value, denseGrad)
249+
(denseGrad, loss + l)
250+
}
251+
252+
val combOp = (c1: (Vector, Double), c2: (Vector, Double)) =>
253+
(c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
254+
val denseGrad1 = grad1.toDense
255+
val denseGrad2 = grad2.toDense
256+
axpy(1.0, denseGrad2, denseGrad1)
257+
(denseGrad1, loss1 + loss2)
258+
}
259+
260+
val zeroSparseVector = Vectors.sparse(n, Seq())
261+
val (gradientSum, lossSum) = data.treeAggregate((zeroSparseVector, 0.0))(seqOp, combOp)
254262

255263
// broadcasted model is not needed anymore
256264
bcW.destroy()

mllib/src/test/scala/org/apache/spark/mllib/optimization/LBFGSSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,25 @@ class LBFGSSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers
230230
(weightLBFGS(0) ~= weightGD(0) relTol 0.02) && (weightLBFGS(1) ~= weightGD(1) relTol 0.02),
231231
"The weight differences between LBFGS and GD should be within 2%.")
232232
}
233+
234+
test("SPARK-18471: LBFGS aggregator on empty partitions") {
235+
val regParam = 0
236+
237+
val initialWeightsWithIntercept = Vectors.dense(0.0)
238+
val convergenceTol = 1e-12
239+
val numIterations = 1
240+
val dataWithEmptyPartitions = sc.parallelize(Seq((1.0, Vectors.dense(2.0))), 2)
241+
242+
LBFGS.runLBFGS(
243+
dataWithEmptyPartitions,
244+
gradient,
245+
simpleUpdater,
246+
numCorrections,
247+
convergenceTol,
248+
numIterations,
249+
regParam,
250+
initialWeightsWithIntercept)
251+
}
233252
}
234253

235254
class LBFGSClusterSuite extends SparkFunSuite with LocalClusterSparkContext {

0 commit comments

Comments
 (0)