From 2a1e632e747620f6fa0f2fcabce9431c6061e742 Mon Sep 17 00:00:00 2001 From: sethah Date: Thu, 16 Jun 2016 16:24:52 -0700 Subject: [PATCH 1/5] remove unnecessary serialization --- .../classification/LogisticRegression.scala | 52 +++++++++++-------- .../classification/LogisticRegression.scala | 2 +- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 51ede15d6c36..48fb9170704c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -937,42 +937,41 @@ class BinaryLogisticRegressionSummary private[classification] ( * Two LogisticAggregator can be merged together to have a summary of loss and gradient of * the corresponding joint dataset. * - * @param coefficients The coefficients corresponding to the features. * @param numClasses the number of possible outcomes for k classes classification problem in * Multinomial Logistic Regression. * @param fitIntercept Whether to fit an intercept term. - * @param featuresStd The standard deviation values of the features. - * @param featuresMean The mean values of the features. */ private class LogisticAggregator( - coefficients: Vector, + numFeatures: Int, numClasses: Int, - fitIntercept: Boolean, - featuresStd: Array[Double], - featuresMean: Array[Double]) extends Serializable { + fitIntercept: Boolean) extends Serializable { private var weightSum = 0.0 private var lossSum = 0.0 - private val coefficientsArray = coefficients match { - case dv: DenseVector => dv.values - case _ => - throw new IllegalArgumentException( - s"coefficients only supports dense vector but got type ${coefficients.getClass}.") - } - - private val dim = if (fitIntercept) coefficientsArray.length - 1 else coefficientsArray.length +// private val coefficientsArray = coefficients match { +// case dv: DenseVector => dv.values +// case _ => +// throw new IllegalArgumentException( +// s"coefficients only supports dense vector b ut got type ${coefficients.getClass}.") +// } - private val gradientSumArray = Array.ofDim[Double](coefficientsArray.length) +// private val dim = if (fitIntercept) coefficientsArray.length - 1 else coefficientsArray.length + private val dim = numFeatures//if (fitIntercept) featuresStd.length else featuresStd.length + private val gradientSumArray = Array.ofDim[Double](dim) /** * Add a new training instance to this LogisticAggregator, and update the loss and gradient * of the objective function. * * @param instance The instance of data point to be added. + * @param coefficients The coefficients corresponding to the features. + * @param featuresStd The standard deviation values of the features. * @return This LogisticAggregator object. */ - def add(instance: Instance): this.type = { + def add(instance: Instance, + coefficients: Vector, + featuresStd: Array[Double]): this.type = { instance match { case Instance(label, weight, features) => require(dim == features.size, s"Dimensions mismatch when adding new instance." + s" Expecting $dim but got ${features.size}.") @@ -980,7 +979,12 @@ private class LogisticAggregator( if (weight == 0.0) return this - val localCoefficientsArray = coefficientsArray + val coefficientsArray = coefficients match { + case dv: DenseVector => dv.values + case _ => + throw new IllegalArgumentException( + s"coefficients only supports dense vector but got type ${coefficients.getClass}.") + } val localGradientSumArray = gradientSumArray numClasses match { @@ -990,11 +994,11 @@ private class LogisticAggregator( var sum = 0.0 features.foreachActive { (index, value) => if (featuresStd(index) != 0.0 && value != 0.0) { - sum += localCoefficientsArray(index) * (value / featuresStd(index)) + sum += coefficientsArray(index) * (value / featuresStd(index)) } } sum + { - if (fitIntercept) localCoefficientsArray(dim) else 0.0 + if (fitIntercept) coefficientsArray(dim) else 0.0 } } @@ -1086,13 +1090,17 @@ private class LogisticCostFun( override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { val numFeatures = featuresStd.length val coeffs = Vectors.fromBreeze(coefficients) + val n = coeffs.size + val localFeaturesStd = featuresStd + val logisticAggregator = { - val seqOp = (c: LogisticAggregator, instance: Instance) => c.add(instance) + val seqOp = (c: LogisticAggregator, instance: Instance) => + c.add(instance, coeffs, localFeaturesStd) val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2) instances.treeAggregate( - new LogisticAggregator(coeffs, numClasses, fitIntercept, featuresStd, featuresMean) + new LogisticAggregator(n, numClasses, fitIntercept) )(seqOp, combOp) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index e4cbf5acbc11..eaf3ac0c1615 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -421,7 +421,7 @@ class LogisticRegressionWithLBFGS private def run(input: RDD[LabeledPoint], initialWeights: Vector, userSuppliedWeights: Boolean): LogisticRegressionModel = { // ml's Logistic regression only supports binary classification currently. - if (numOfLinearPredictor == 1) { + if (numOfLinearPredictor == 1 && false) { def runWithMlLogisticRegression(elasticNetParam: Double) = { // Prepare the ml LogisticRegression based on our settings val lr = new org.apache.spark.ml.classification.LogisticRegression() From 86505b75cd083c82a3cdefe1221eb6ed4e9750bb Mon Sep 17 00:00:00 2001 From: sethah Date: Thu, 16 Jun 2016 18:28:10 -0700 Subject: [PATCH 2/5] comments --- .../spark/ml/classification/LogisticRegression.scala | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 48fb9170704c..db43408bd903 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -949,15 +949,7 @@ private class LogisticAggregator( private var weightSum = 0.0 private var lossSum = 0.0 -// private val coefficientsArray = coefficients match { -// case dv: DenseVector => dv.values -// case _ => -// throw new IllegalArgumentException( -// s"coefficients only supports dense vector b ut got type ${coefficients.getClass}.") -// } - -// private val dim = if (fitIntercept) coefficientsArray.length - 1 else coefficientsArray.length - private val dim = numFeatures//if (fitIntercept) featuresStd.length else featuresStd.length + private val dim = numFeatures private val gradientSumArray = Array.ofDim[Double](dim) /** From ef8fdea808052846055979c642b5f47255ee9e3d Mon Sep 17 00:00:00 2001 From: sethah Date: Thu, 16 Jun 2016 20:45:34 -0700 Subject: [PATCH 3/5] dimension corrections --- .../apache/spark/ml/classification/LogisticRegression.scala | 5 +++-- .../spark/mllib/classification/LogisticRegression.scala | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index db43408bd903..8bde030616b9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -950,7 +950,8 @@ private class LogisticAggregator( private var lossSum = 0.0 private val dim = numFeatures - private val gradientSumArray = Array.ofDim[Double](dim) + private val gradientSumArray = + Array.ofDim[Double](if (fitIntercept) numFeatures + 1 else numFeatures) /** * Add a new training instance to this LogisticAggregator, and update the loss and gradient @@ -1092,7 +1093,7 @@ private class LogisticCostFun( val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2) instances.treeAggregate( - new LogisticAggregator(n, numClasses, fitIntercept) + new LogisticAggregator(numFeatures, numClasses, fitIntercept) )(seqOp, combOp) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala index eaf3ac0c1615..e4cbf5acbc11 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala @@ -421,7 +421,7 @@ class LogisticRegressionWithLBFGS private def run(input: RDD[LabeledPoint], initialWeights: Vector, userSuppliedWeights: Boolean): LogisticRegressionModel = { // ml's Logistic regression only supports binary classification currently. - if (numOfLinearPredictor == 1 && false) { + if (numOfLinearPredictor == 1) { def runWithMlLogisticRegression(elasticNetParam: Double) = { // Prepare the ml LogisticRegression based on our settings val lr = new org.apache.spark.ml.classification.LogisticRegression() From 96b0a4505b4a43bc254065e084fb9b72b1e4a92b Mon Sep 17 00:00:00 2001 From: sethah Date: Fri, 17 Jun 2016 08:29:46 -0700 Subject: [PATCH 4/5] spark style --- .../spark/ml/classification/LogisticRegression.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 8bde030616b9..102c117960db 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -962,9 +962,10 @@ private class LogisticAggregator( * @param featuresStd The standard deviation values of the features. * @return This LogisticAggregator object. */ - def add(instance: Instance, - coefficients: Vector, - featuresStd: Array[Double]): this.type = { + def add( + instance: Instance, + coefficients: Vector, + featuresStd: Array[Double]): this.type = { instance match { case Instance(label, weight, features) => require(dim == features.size, s"Dimensions mismatch when adding new instance." + s" Expecting $dim but got ${features.size}.") From 5d668a6f93859801262393540fe954257f433a35 Mon Sep 17 00:00:00 2001 From: sethah Date: Fri, 17 Jun 2016 09:16:37 -0700 Subject: [PATCH 5/5] removing dim --- .../ml/classification/LogisticRegression.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 102c117960db..9469acf62e13 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -942,14 +942,13 @@ class BinaryLogisticRegressionSummary private[classification] ( * @param fitIntercept Whether to fit an intercept term. */ private class LogisticAggregator( - numFeatures: Int, + private val numFeatures: Int, numClasses: Int, fitIntercept: Boolean) extends Serializable { private var weightSum = 0.0 private var lossSum = 0.0 - private val dim = numFeatures private val gradientSumArray = Array.ofDim[Double](if (fitIntercept) numFeatures + 1 else numFeatures) @@ -967,8 +966,8 @@ private class LogisticAggregator( coefficients: Vector, featuresStd: Array[Double]): this.type = { instance match { case Instance(label, weight, features) => - require(dim == features.size, s"Dimensions mismatch when adding new instance." + - s" Expecting $dim but got ${features.size}.") + require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + + s" Expecting $numFeatures but got ${features.size}.") require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") if (weight == 0.0) return this @@ -992,7 +991,7 @@ private class LogisticAggregator( } } sum + { - if (fitIntercept) coefficientsArray(dim) else 0.0 + if (fitIntercept) coefficientsArray(numFeatures) else 0.0 } } @@ -1005,7 +1004,7 @@ private class LogisticAggregator( } if (fitIntercept) { - localGradientSumArray(dim) += multiplier + localGradientSumArray(numFeatures) += multiplier } if (label > 0) { @@ -1032,8 +1031,8 @@ private class LogisticAggregator( * @return This LogisticAggregator object. */ def merge(other: LogisticAggregator): this.type = { - require(dim == other.dim, s"Dimensions mismatch when merging with another " + - s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.") + require(numFeatures == other.numFeatures, s"Dimensions mismatch when merging with another " + + s"LeastSquaresAggregator. Expecting $numFeatures but got ${other.numFeatures}.") if (other.weightSum != 0.0) { weightSum += other.weightSum