From 533d2eaf266efb0449f235929d8e6bb061b3f19e Mon Sep 17 00:00:00 2001
From: sethah
Date: Fri, 8 Jul 2016 11:53:33 -0700
Subject: [PATCH 1/6] remove unnecessary serialization in linear regression
---
.../ml/regression/LinearRegression.scala | 78 +++++++++----------
1 file changed, 37 insertions(+), 41 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 6d5e398dfe15..a43c80a6d209 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -862,46 +862,17 @@ class LinearRegressionSummary private[regression] (
* $$
*
*
- * @param coefficients The coefficients corresponding to the features.
* @param labelStd The standard deviation value of the label.
- * @param labelMean The mean value of the label.
- * @param fitIntercept Whether to fit an intercept term.
- * @param featuresStd The standard deviation values of the features.
- * @param featuresMean The mean values of the features.
*/
private class LeastSquaresAggregator(
- coefficients: Vector,
- labelStd: Double,
- labelMean: Double,
- fitIntercept: Boolean,
- featuresStd: Array[Double],
- featuresMean: Array[Double]) extends Serializable {
+ private val numFeatures: Int,
+ labelStd: Double) extends Serializable {
private var totalCnt: Long = 0L
private var weightSum: Double = 0.0
private var lossSum = 0.0
- private val (effectiveCoefficientsArray: Array[Double], offset: Double, dim: Int) = {
- val coefficientsArray = coefficients.toArray.clone()
- var sum = 0.0
- var i = 0
- val len = coefficientsArray.length
- while (i < len) {
- if (featuresStd(i) != 0.0) {
- coefficientsArray(i) /= featuresStd(i)
- sum += coefficientsArray(i) * featuresMean(i)
- } else {
- coefficientsArray(i) = 0.0
- }
- i += 1
- }
- val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
- (coefficientsArray, offset, coefficientsArray.length)
- }
-
- private val effectiveCoefficientsVector = Vectors.dense(effectiveCoefficientsArray)
-
- private val gradientSumArray = Array.ofDim[Double](dim)
+ private val gradientSumArray = Array.ofDim[Double](numFeatures)
/**
* Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient
@@ -910,10 +881,14 @@ private class LeastSquaresAggregator(
* @param instance The instance of data point to be added.
* @return This LeastSquaresAggregator object.
*/
- def add(instance: Instance): this.type = {
+ def add(
+ instance: Instance,
+ effectiveCoefficientsVector: Vector,
+ offset: Double,
+ featuresStd: Array[Double]): this.type = {
instance match { case Instance(label, weight, features) =>
- require(dim == features.size, s"Dimensions mismatch when adding new sample." +
- s" Expecting $dim but got ${features.size}.")
+ require(numFeatures == features.size, s"Dimensions mismatch when adding new sample." +
+ s" Expecting $numFeatures but got ${features.size}.")
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
if (weight == 0.0) return this
@@ -945,8 +920,8 @@ private class LeastSquaresAggregator(
* @return This LeastSquaresAggregator object.
*/
def merge(other: LeastSquaresAggregator): this.type = {
- require(dim == other.dim, s"Dimensions mismatch when merging with another " +
- s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")
+ require(numFeatures == other.numFeatures, s"Dimensions mismatch when merging with another " +
+ s"LeastSquaresAggregator. Expecting $numFeatures but got ${other.numFeatures}.")
if (other.weightSum != 0) {
totalCnt += other.totalCnt
@@ -956,7 +931,7 @@ private class LeastSquaresAggregator(
var i = 0
val localThisGradientSumArray = this.gradientSumArray
val localOtherGradientSumArray = other.gradientSumArray
- while (i < dim) {
+ while (i < numFeatures) {
localThisGradientSumArray(i) += localOtherGradientSumArray(i)
i += 1
}
@@ -998,14 +973,35 @@ private class LeastSquaresCostFun(
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
+ val localFeaturesStd = featuresStd
+
+ val (effectiveCoefficientsArray: Array[Double], offset: Double) = {
+ val coefficientsArray = coefficients.toArray.clone()
+ var sum = 0.0
+ var i = 0
+ val len = coefficientsArray.length
+ while (i < len) {
+ if (featuresStd(i) != 0.0) {
+ coefficientsArray(i) /= featuresStd(i)
+ sum += coefficientsArray(i) * featuresMean(i)
+ } else {
+ coefficientsArray(i) = 0.0
+ }
+ i += 1
+ }
+ val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
+ (coefficientsArray, offset)
+ }
+
+ val effectiveCoefficientsVector = Vectors.dense(effectiveCoefficientsArray)
val leastSquaresAggregator = {
- val seqOp = (c: LeastSquaresAggregator, instance: Instance) => c.add(instance)
+ val seqOp = (c: LeastSquaresAggregator, instance: Instance) =>
+ c.add(instance, effectiveCoefficientsVector, offset, localFeaturesStd)
val combOp = (c1: LeastSquaresAggregator, c2: LeastSquaresAggregator) => c1.merge(c2)
instances.treeAggregate(
- new LeastSquaresAggregator(coeffs, labelStd, labelMean, fitIntercept, featuresStd,
- featuresMean))(seqOp, combOp)
+ new LeastSquaresAggregator(coeffs.size, labelStd))(seqOp, combOp)
}
val totalGradientArray = leastSquaresAggregator.gradient.toArray
From 152304c09470288a7c4e6d5b5cad1213b22d2dfb Mon Sep 17 00:00:00 2001
From: sethah
Date: Fri, 8 Jul 2016 15:45:07 -0700
Subject: [PATCH 2/6] using transient
---
.../ml/regression/LinearRegression.scala | 101 +++++++++++-------
1 file changed, 63 insertions(+), 38 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index a43c80a6d209..f6055321ab06 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -17,6 +17,8 @@
package org.apache.spark.ml.regression
+import org.apache.spark.broadcast.Broadcast
+
import scala.collection.mutable
import breeze.linalg.{DenseVector => BDV}
@@ -82,6 +84,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
/**
* Set the regularization parameter.
* Default is 0.0.
+ *
* @group setParam
*/
@Since("1.3.0")
@@ -91,6 +94,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
/**
* Set if we should fit the intercept
* Default is true.
+ *
* @group setParam
*/
@Since("1.5.0")
@@ -104,6 +108,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* the models should be always converged to the same solution when no regularization
* is applied. In R's GLMNET package, the default behavior is true as well.
* Default is true.
+ *
* @group setParam
*/
@Since("1.5.0")
@@ -115,6 +120,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
* For 0 < alpha < 1, the penalty is a combination of L1 and L2.
* Default is 0.0 which is an L2 penalty.
+ *
* @group setParam
*/
@Since("1.4.0")
@@ -124,6 +130,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
/**
* Set the maximum number of iterations.
* Default is 100.
+ *
* @group setParam
*/
@Since("1.3.0")
@@ -134,6 +141,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* Set the convergence tolerance of iterations.
* Smaller value will lead to higher accuracy with the cost of more iterations.
* Default is 1E-6.
+ *
* @group setParam
*/
@Since("1.4.0")
@@ -144,6 +152,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* Whether to over-/under-sample training instances according to the given weights in weightCol.
* If not set or empty, all instances are treated equally (weight 1.0).
* Default is not set, so all instances have weight one.
+ *
* @group setParam
*/
@Since("1.6.0")
@@ -157,6 +166,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
* solution to the linear regression problem.
* The default value is "auto" which means that the solver algorithm is
* selected automatically.
+ *
* @group setParam
*/
@Since("1.6.0")
@@ -419,6 +429,7 @@ class LinearRegressionModel private[ml] (
/**
* Evaluates the model on a test dataset.
+ *
* @param dataset Test dataset to evaluate model on.
*/
@Since("2.0.0")
@@ -544,6 +555,7 @@ class LinearRegressionTrainingSummary private[regression] (
* Number of training iterations until termination
*
* This value is only available when using the "l-bfgs" solver.
+ *
* @see [[LinearRegression.solver]]
*/
@Since("1.5.0")
@@ -862,17 +874,49 @@ class LinearRegressionSummary private[regression] (
* $$
*
*
+ * @param bcCoefficients The broadcast coefficients corresponding to the features.
* @param labelStd The standard deviation value of the label.
+ * @param labelMean The mean value of the label.
+ * @param fitIntercept Whether to fit an intercept term.
+ * @param bcFeaturesStd The broadcast standard deviation values of the features.
+ * @param bcFeaturesMean The broadcast mean values of the features.
*/
private class LeastSquaresAggregator(
- private val numFeatures: Int,
- labelStd: Double) extends Serializable {
+ bcCoefficients: Broadcast[Vector],
+ labelStd: Double,
+ labelMean: Double,
+ fitIntercept: Boolean,
+ bcFeaturesStd: Broadcast[Array[Double]],
+ bcFeaturesMean: Broadcast[Array[Double]]) extends Serializable {
private var totalCnt: Long = 0L
private var weightSum: Double = 0.0
private var lossSum = 0.0
- private val gradientSumArray = Array.ofDim[Double](numFeatures)
+ private val dim = bcCoefficients.value.size
+ @transient private lazy val featuresStd = bcFeaturesStd.value
+ @transient private lazy val coefAndOffset = {
+ val coefficientsArray = bcCoefficients.value.toArray.clone()
+ val featuresMean = bcFeaturesMean.value
+ var sum = 0.0
+ var i = 0
+ val len = coefficientsArray.length
+ while (i < len) {
+ if (featuresStd(i) != 0.0) {
+ coefficientsArray(i) /= featuresStd(i)
+ sum += coefficientsArray(i) * featuresMean(i)
+ } else {
+ coefficientsArray(i) = 0.0
+ }
+ i += 1
+ }
+ val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
+ (Vectors.dense(coefficientsArray), offset)
+ }
+ @transient private lazy val effectiveCoefficientsVector = coefAndOffset._1
+ @transient private lazy val offset = coefAndOffset._2
+
+ private val gradientSumArray = Array.ofDim[Double](dim)
/**
* Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient
@@ -881,14 +925,11 @@ private class LeastSquaresAggregator(
* @param instance The instance of data point to be added.
* @return This LeastSquaresAggregator object.
*/
- def add(
- instance: Instance,
- effectiveCoefficientsVector: Vector,
- offset: Double,
- featuresStd: Array[Double]): this.type = {
+ def add(instance: Instance): this.type = {
+
instance match { case Instance(label, weight, features) =>
- require(numFeatures == features.size, s"Dimensions mismatch when adding new sample." +
- s" Expecting $numFeatures but got ${features.size}.")
+ require(dim == features.size, s"Dimensions mismatch when adding new sample." +
+ s" Expecting $dim but got ${features.size}.")
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
if (weight == 0.0) return this
@@ -897,9 +938,10 @@ private class LeastSquaresAggregator(
if (diff != 0) {
val localGradientSumArray = gradientSumArray
+ val localFeaturesStd = featuresStd
features.foreachActive { (index, value) =>
- if (featuresStd(index) != 0.0 && value != 0.0) {
- localGradientSumArray(index) += weight * diff * value / featuresStd(index)
+ if (localFeaturesStd(index) != 0.0 && value != 0.0) {
+ localGradientSumArray(index) += weight * diff * value / localFeaturesStd(index)
}
}
lossSum += weight * diff * diff / 2.0
@@ -920,8 +962,8 @@ private class LeastSquaresAggregator(
* @return This LeastSquaresAggregator object.
*/
def merge(other: LeastSquaresAggregator): this.type = {
- require(numFeatures == other.numFeatures, s"Dimensions mismatch when merging with another " +
- s"LeastSquaresAggregator. Expecting $numFeatures but got ${other.numFeatures}.")
+ require(dim == other.dim, s"Dimensions mismatch when merging with another " +
+ s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")
if (other.weightSum != 0) {
totalCnt += other.totalCnt
@@ -931,7 +973,7 @@ private class LeastSquaresAggregator(
var i = 0
val localThisGradientSumArray = this.gradientSumArray
val localOtherGradientSumArray = other.gradientSumArray
- while (i < numFeatures) {
+ while (i < dim) {
localThisGradientSumArray(i) += localOtherGradientSumArray(i)
i += 1
}
@@ -973,35 +1015,18 @@ private class LeastSquaresCostFun(
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
- val localFeaturesStd = featuresStd
-
- val (effectiveCoefficientsArray: Array[Double], offset: Double) = {
- val coefficientsArray = coefficients.toArray.clone()
- var sum = 0.0
- var i = 0
- val len = coefficientsArray.length
- while (i < len) {
- if (featuresStd(i) != 0.0) {
- coefficientsArray(i) /= featuresStd(i)
- sum += coefficientsArray(i) * featuresMean(i)
- } else {
- coefficientsArray(i) = 0.0
- }
- i += 1
- }
- val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
- (coefficientsArray, offset)
- }
-
- val effectiveCoefficientsVector = Vectors.dense(effectiveCoefficientsArray)
+ val bcFeaturesStd = instances.context.broadcast(featuresStd)
+ val bcFeaturesMean = instances.context.broadcast(featuresMean)
+ val bcCoeffs = instances.context.broadcast(coeffs)
val leastSquaresAggregator = {
val seqOp = (c: LeastSquaresAggregator, instance: Instance) =>
- c.add(instance, effectiveCoefficientsVector, offset, localFeaturesStd)
+ c.add(instance)
val combOp = (c1: LeastSquaresAggregator, c2: LeastSquaresAggregator) => c1.merge(c2)
instances.treeAggregate(
- new LeastSquaresAggregator(coeffs.size, labelStd))(seqOp, combOp)
+ new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean,
+ fitIntercept, bcFeaturesStd, bcFeaturesMean))(seqOp, combOp)
}
val totalGradientArray = leastSquaresAggregator.gradient.toArray
From eb4baac8379d48ea3197a7d4d05c121ead35116e Mon Sep 17 00:00:00 2001
From: sethah
Date: Fri, 8 Jul 2016 15:52:12 -0700
Subject: [PATCH 3/6] style
---
.../apache/spark/ml/regression/LinearRegression.scala | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index f6055321ab06..78588d264679 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -17,8 +17,6 @@
package org.apache.spark.ml.regression
-import org.apache.spark.broadcast.Broadcast
-
import scala.collection.mutable
import breeze.linalg.{DenseVector => BDV}
@@ -28,6 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
import org.apache.spark.ml.linalg.{Vector, Vectors}
@@ -926,7 +925,6 @@ private class LeastSquaresAggregator(
* @return This LeastSquaresAggregator object.
*/
def add(instance: Instance): this.type = {
-
instance match { case Instance(label, weight, features) =>
require(dim == features.size, s"Dimensions mismatch when adding new sample." +
s" Expecting $dim but got ${features.size}.")
@@ -1020,13 +1018,12 @@ private class LeastSquaresCostFun(
val bcCoeffs = instances.context.broadcast(coeffs)
val leastSquaresAggregator = {
- val seqOp = (c: LeastSquaresAggregator, instance: Instance) =>
- c.add(instance)
+ val seqOp = (c: LeastSquaresAggregator, instance: Instance) => c.add(instance)
val combOp = (c1: LeastSquaresAggregator, c2: LeastSquaresAggregator) => c1.merge(c2)
instances.treeAggregate(
- new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean,
- fitIntercept, bcFeaturesStd, bcFeaturesMean))(seqOp, combOp)
+ new LeastSquaresAggregator(bcCoeffs, labelStd, labelMean, fitIntercept, bcFeaturesStd,
+ bcFeaturesMean))(seqOp, combOp)
}
val totalGradientArray = leastSquaresAggregator.gradient.toArray
From 10ba14e1bd56e7e51513c8399b75116e54add023 Mon Sep 17 00:00:00 2001
From: sethah
Date: Wed, 3 Aug 2016 08:31:46 -0700
Subject: [PATCH 4/6] destroy bc var
---
.../spark/ml/regression/LinearRegression.scala | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 78588d264679..a042589e3ef3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -894,7 +894,7 @@ private class LeastSquaresAggregator(
private val dim = bcCoefficients.value.size
@transient private lazy val featuresStd = bcFeaturesStd.value
- @transient private lazy val coefAndOffset = {
+ @transient private lazy val effectiveCoefAndOffset = {
val coefficientsArray = bcCoefficients.value.toArray.clone()
val featuresMean = bcFeaturesMean.value
var sum = 0.0
@@ -912,8 +912,8 @@ private class LeastSquaresAggregator(
val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
(Vectors.dense(coefficientsArray), offset)
}
- @transient private lazy val effectiveCoefficientsVector = coefAndOffset._1
- @transient private lazy val offset = coefAndOffset._2
+ @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
+ @transient private lazy val offset = effectiveCoefAndOffset._2
private val gradientSumArray = Array.ofDim[Double](dim)
@@ -1011,10 +1011,11 @@ private class LeastSquaresCostFun(
featuresMean: Array[Double],
effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
+ val bcFeaturesStd = instances.context.broadcast(featuresStd)
+ val bcFeaturesMean = instances.context.broadcast(featuresMean)
+
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
- val bcFeaturesStd = instances.context.broadcast(featuresStd)
- val bcFeaturesMean = instances.context.broadcast(featuresMean)
val bcCoeffs = instances.context.broadcast(coeffs)
val leastSquaresAggregator = {
@@ -1027,6 +1028,7 @@ private class LeastSquaresCostFun(
}
val totalGradientArray = leastSquaresAggregator.gradient.toArray
+ bcCoeffs.destroy(blocking = false)
val regVal = if (effectiveL2regParam == 0.0) {
0.0
From 0d9979582d9ed2c9869452f5a0d44d46ebaf9aeb Mon Sep 17 00:00:00 2001
From: sethah
Date: Fri, 5 Aug 2016 13:15:25 -0700
Subject: [PATCH 5/6] add comment about serialization
---
.../scala/org/apache/spark/ml/regression/LinearRegression.scala | 2 ++
1 file changed, 2 insertions(+)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index a042589e3ef3..d3d36ae516a9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -893,6 +893,7 @@ private class LeastSquaresAggregator(
private var lossSum = 0.0
private val dim = bcCoefficients.value.size
+ // make transient so we do not serialize between aggregation stages
@transient private lazy val featuresStd = bcFeaturesStd.value
@transient private lazy val effectiveCoefAndOffset = {
val coefficientsArray = bcCoefficients.value.toArray.clone()
@@ -912,6 +913,7 @@ private class LeastSquaresAggregator(
val offset = if (fitIntercept) labelMean / labelStd - sum else 0.0
(Vectors.dense(coefficientsArray), offset)
}
+ // do not use tuple assignment above because it will circumvent the @transient tag
@transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1
@transient private lazy val offset = effectiveCoefAndOffset._2
From 9c2bf479ea8a82ee893716416bd364d3b1324c20 Mon Sep 17 00:00:00 2001
From: sethah
Date: Fri, 5 Aug 2016 14:54:35 -0700
Subject: [PATCH 6/6] destroy featuresstd and featuresmean
---
.../ml/regression/LinearRegression.scala | 19 +++++++++++--------
1 file changed, 11 insertions(+), 8 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index d3d36ae516a9..76be4204e905 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -279,6 +279,8 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val yStd = if (rawYStd > 0) rawYStd else math.abs(yMean)
val featuresMean = featuresSummarizer.mean.toArray
val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt)
+ val bcFeaturesMean = instances.context.broadcast(featuresMean)
+ val bcFeaturesStd = instances.context.broadcast(featuresStd)
if (!$(fitIntercept) && (0 until numFeatures).exists { i =>
featuresStd(i) == 0.0 && featuresMean(i) != 0.0 }) {
@@ -294,7 +296,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
val effectiveL2RegParam = (1.0 - $(elasticNetParam)) * effectiveRegParam
val costFun = new LeastSquaresCostFun(instances, yStd, yMean, $(fitIntercept),
- $(standardization), featuresStd, featuresMean, effectiveL2RegParam)
+ $(standardization), bcFeaturesStd, bcFeaturesMean, effectiveL2RegParam)
val optimizer = if ($(elasticNetParam) == 0.0 || effectiveRegParam == 0.0) {
new BreezeLBFGS[BDV[Double]]($(maxIter), 10, $(tol))
@@ -339,6 +341,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
throw new SparkException(msg)
}
+ bcFeaturesMean.destroy(blocking = false)
+ bcFeaturesStd.destroy(blocking = false)
+
/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.
@@ -1009,16 +1014,14 @@ private class LeastSquaresCostFun(
labelMean: Double,
fitIntercept: Boolean,
standardization: Boolean,
- featuresStd: Array[Double],
- featuresMean: Array[Double],
+ bcFeaturesStd: Broadcast[Array[Double]],
+ bcFeaturesMean: Broadcast[Array[Double]],
effectiveL2regParam: Double) extends DiffFunction[BDV[Double]] {
- val bcFeaturesStd = instances.context.broadcast(featuresStd)
- val bcFeaturesMean = instances.context.broadcast(featuresMean)
-
override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
val coeffs = Vectors.fromBreeze(coefficients)
val bcCoeffs = instances.context.broadcast(coeffs)
+ val localFeaturesStd = bcFeaturesStd.value
val leastSquaresAggregator = {
val seqOp = (c: LeastSquaresAggregator, instance: Instance) => c.add(instance)
@@ -1044,13 +1047,13 @@ private class LeastSquaresCostFun(
totalGradientArray(index) += effectiveL2regParam * value
value * value
} else {
- if (featuresStd(index) != 0.0) {
+ if (localFeaturesStd(index) != 0.0) {
// If `standardization` is false, we still standardize the data
// to improve the rate of convergence; as a result, we have to
// perform this reverse standardization by penalizing each component
// differently to get effectively the same objective function when
// the training dataset is not standardized.
- val temp = value / (featuresStd(index) * featuresStd(index))
+ val temp = value / (localFeaturesStd(index) * localFeaturesStd(index))
totalGradientArray(index) += effectiveL2regParam * temp
value * temp
} else {