From 232625fadd0d9e2d3e2f5f5f81e71292a735b3ca Mon Sep 17 00:00:00 2001 From: sethah Date: Sun, 26 Feb 2017 20:25:14 -0800 Subject: [PATCH 1/4] performance cleanup in svc agg --- .../spark/ml/classification/LinearSVC.scala | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index bf6e76d7ac44..42deed3c59bf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -440,19 +440,9 @@ private class LinearSVCAggregator( private val numFeatures: Int = bcFeaturesStd.value.length private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures - private val coefficients: Vector = bcCoefficients.value private var weightSum: Double = 0.0 private var lossSum: Double = 0.0 - require(numFeaturesPlusIntercept == coefficients.size, s"Dimension mismatch. Coefficients " + - s"length ${coefficients.size}, FeaturesStd length ${numFeatures}, fitIntercept: $fitIntercept") - - private val coefficientsArray = coefficients match { - case dv: DenseVector => dv.values - case _ => - throw new IllegalArgumentException( - s"coefficients only supports dense vector but got type ${coefficients.getClass}.") - } - private val gradientSumArray = Array.fill[Double](coefficientsArray.length)(0) + private lazy val gradientSumArray = new Array[Double](numFeaturesPlusIntercept) /** * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient @@ -463,9 +453,11 @@ private class LinearSVCAggregator( */ def add(instance: Instance): this.type = { instance match { case Instance(label, weight, features) => + require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + + s" Expecting $numFeatures but got ${features.size}.") if (weight == 0.0) return this val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = coefficientsArray + val localCoefficients = bcCoefficients.value.toArray val localGradientSumArray = gradientSumArray val dotProduct = { @@ -530,18 +522,15 @@ private class LinearSVCAggregator( this } - def loss: Double = { - if (weightSum != 0) { - lossSum / weightSum - } else 0.0 - } + def loss: Double = if (weightSum != 0) lossSum / weightSum else 0.0 def gradient: Vector = { if (weightSum != 0) { val result = Vectors.dense(gradientSumArray.clone()) scal(1.0 / weightSum, result) result - } else Vectors.dense(Array.fill[Double](coefficientsArray.length)(0)) + } else { + Vectors.dense(new Array[Double](numFeaturesPlusIntercept)) + } } - } From 59583fa6bfec650e6873cb011c7736cfb4d8def9 Mon Sep 17 00:00:00 2001 From: sethah Date: Mon, 27 Feb 2017 08:31:07 -0800 Subject: [PATCH 2/4] add sparse coeff test --- .../spark/ml/classification/LinearSVC.scala | 7 ++++++- .../ml/classification/LinearSVCSuite.scala | 19 ++++++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 42deed3c59bf..553c80a44ade 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -442,6 +442,11 @@ private class LinearSVCAggregator( private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures private var weightSum: Double = 0.0 private var lossSum: Double = 0.0 + @transient private lazy val coefficientsArray = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + + s" but got type ${bcCoefficients.value.getClass}.") + } private lazy val gradientSumArray = new Array[Double](numFeaturesPlusIntercept) /** @@ -457,7 +462,7 @@ private class LinearSVCAggregator( s" Expecting $numFeatures but got ${features.size}.") if (weight == 0.0) return this val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = bcCoefficients.value.toArray + val localCoefficients = coefficientsArray val localGradientSumArray = gradientSumArray val dotProduct = { diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index ee2aefee7a6d..4d5a9e8ee151 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -18,12 +18,10 @@ package org.apache.spark.ml.classification import scala.util.Random - import breeze.linalg.{DenseVector => BDV} - import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LinearSVCSuite._ -import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.ml.feature.{Instance, LabeledPoint} import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} @@ -123,6 +121,21 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model2.intercept !== 0.0) } + test("sparse coefficients in SVCAggregator") { + val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) + val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) + val agg = new LinearSVCAggregator(bcCoefficients, bcFeaturesStd, true) + val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) + } + } + assert(thrown.getMessage.contains("coefficients only supports dense")) + + bcCoefficients.destroy(blocking = false) + bcFeaturesStd.destroy(blocking = false) + } + test("linearSVC with sample weights") { def modelEquals(m1: LinearSVCModel, m2: LinearSVCModel): Unit = { assert(m1.coefficients ~== m2.coefficients absTol 0.05) From 961fbaa194e35342d933f4cf0ac18b1f98d6bac0 Mon Sep 17 00:00:00 2001 From: sethah Date: Mon, 27 Feb 2017 08:33:41 -0800 Subject: [PATCH 3/4] style on imports --- .../org/apache/spark/ml/classification/LinearSVCSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index 4d5a9e8ee151..a165d8a9345c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.ml.classification import scala.util.Random + import breeze.linalg.{DenseVector => BDV} + import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LinearSVCSuite._ import org.apache.spark.ml.feature.{Instance, LabeledPoint} From 19be223f08d040da878460cd9d9d9d73b755d719 Mon Sep 17 00:00:00 2001 From: sethah Date: Thu, 2 Mar 2017 16:31:58 -0800 Subject: [PATCH 4/4] lazy val on other aggregators --- .../org/apache/spark/ml/classification/LinearSVC.scala | 1 + .../apache/spark/ml/classification/LogisticRegression.scala | 2 +- .../org/apache/spark/ml/clustering/GaussianMixture.scala | 6 +++--- .../apache/spark/ml/regression/AFTSurvivalRegression.scala | 2 +- .../org/apache/spark/ml/regression/LinearRegression.scala | 2 +- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 553c80a44ade..f76b14eeeb54 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -458,6 +458,7 @@ private class LinearSVCAggregator( */ def add(instance: Instance): this.type = { instance match { case Instance(label, weight, features) => + require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + s" Expecting $numFeatures but got ${features.size}.") if (weight == 0.0) return this diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 738b35135f7a..1a78187d4f8e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -1436,7 +1436,7 @@ private class LogisticAggregator( case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + s"got type ${bcCoefficients.value.getClass}.)") } - private val gradientSumArray = new Array[Double](coefficientSize) + private lazy val gradientSumArray = new Array[Double](coefficientSize) if (multinomial && numClasses <= 2) { logInfo(s"Multinomial logistic regression for binary classification yields separate " + diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index ea2dc6cfd8d3..a9c1a7ba0bc8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -580,10 +580,10 @@ private class ExpectationAggregator( private val k: Int = bcWeights.value.length private var totalCnt: Long = 0L private var newLogLikelihood: Double = 0.0 - private val newWeights: Array[Double] = new Array[Double](k) - private val newMeans: Array[DenseVector] = Array.fill(k)( + private lazy val newWeights: Array[Double] = new Array[Double](k) + private lazy val newMeans: Array[DenseVector] = Array.fill(k)( new DenseVector(Array.fill[Double](numFeatures)(0.0))) - private val newCovs: Array[DenseVector] = Array.fill(k)( + private lazy val newCovs: Array[DenseVector] = Array.fill(k)( new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 2)(0.0))) @transient private lazy val oldGaussians = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 4b3608330c1b..094853b6f480 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -526,7 +526,7 @@ private class AFTAggregator( private var totalCnt: Long = 0L private var lossSum = 0.0 // Here we optimize loss function over log(sigma), intercept and coefficients - private val gradientSumArray = Array.ofDim[Double](length) + private lazy val gradientSumArray = Array.ofDim[Double](length) def count: Long = totalCnt def loss: Double = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 2de7e81d8d41..45df1d9be647 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -959,7 +959,7 @@ private class LeastSquaresAggregator( @transient private lazy val effectiveCoefficientsVector = effectiveCoefAndOffset._1 @transient private lazy val offset = effectiveCoefAndOffset._2 - private val gradientSumArray = Array.ofDim[Double](dim) + private lazy val gradientSumArray = Array.ofDim[Double](dim) /** * Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient