From dbda03302bedf52913582030da3eac567c5d8775 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Thu, 5 Mar 2015 14:28:43 +0530 Subject: [PATCH 1/5] [SPARK-6025] Add helper method evaluateEachIteration to extract learning curve --- docs/mllib-ensembles.md | 4 +- .../mllib/tree/GradientBoostedTrees.scala | 43 ++++++++++++++++++- .../spark/mllib/tree/loss/AbsoluteError.scala | 19 ++++++++ .../spark/mllib/tree/loss/LogLoss.scala | 19 ++++++++ .../apache/spark/mllib/tree/loss/Loss.scala | 10 +++++ .../spark/mllib/tree/loss/SquaredError.scala | 18 ++++++++ .../tree/GradientBoostedTreesSuite.scala | 17 +++++++- 7 files changed, 125 insertions(+), 5 deletions(-) diff --git a/docs/mllib-ensembles.md b/docs/mllib-ensembles.md index cbfb682609af..7521fb14a7bd 100644 --- a/docs/mllib-ensembles.md +++ b/docs/mllib-ensembles.md @@ -464,8 +464,8 @@ first one being the training dataset and the second being the validation dataset The training is stopped when the improvement in the validation error is not more than a certain tolerance (supplied by the `validationTol` argument in `BoostingStrategy`). In practice, the validation error decreases initially and later increases. There might be cases in which the validation error does not change monotonically, -and the user is advised to set a large enough negative tolerance and examine the validation curve to to tune the number of -iterations. +and the user is advised to set a large enough negative tolerance and examine the validation curve using `evaluateEachIteration` +(which gives the error or loss per iteration) to tune the number of iterations. ### Examples diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala index a9c93e181e3c..54199eb33c79 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala @@ -25,6 +25,7 @@ import org.apache.spark.mllib.tree.configuration.BoostingStrategy import org.apache.spark.mllib.tree.configuration.Algo._ import org.apache.spark.mllib.tree.impl.TimeTracker import org.apache.spark.mllib.tree.impurity.Variance +import org.apache.spark.mllib.tree.loss.Loss import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTreesModel} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -52,6 +53,10 @@ import org.apache.spark.storage.StorageLevel class GradientBoostedTrees(private val boostingStrategy: BoostingStrategy) extends Serializable with Logging { + private val numIterations = boostingStrategy.numIterations + private var baseLearners = new Array[DecisionTreeModel](numIterations) + private var baseLearnerWeights = new Array[Double](numIterations) + /** * Method to train a gradient boosting model * @param input Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. @@ -59,7 +64,7 @@ class GradientBoostedTrees(private val boostingStrategy: BoostingStrategy) */ def run(input: RDD[LabeledPoint]): GradientBoostedTreesModel = { val algo = boostingStrategy.treeStrategy.algo - algo match { + val fitGradientBoostingModel = algo match { case Regression => GradientBoostedTrees.boost(input, input, boostingStrategy, validate=false) case Classification => // Map labels to -1, +1 so binary classification can be treated as regression. @@ -69,6 +74,42 @@ class GradientBoostedTrees(private val boostingStrategy: BoostingStrategy) case _ => throw new IllegalArgumentException(s"$algo is not supported by the gradient boosting.") } + baseLearners = fitGradientBoostingModel.trees + baseLearnerWeights = fitGradientBoostingModel.treeWeights + fitGradientBoostingModel + } + + /** + * Method to compute error or loss for every iteration of gradient boosting. + * @param data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @param loss: evaluation metric that defaults to boostingStrategy.loss + * @return an array with index i having the losses or errors for the ensemble + * containing trees 1 to i + 1 + */ + def evaluateEachIteration( + data: RDD[LabeledPoint], + loss: Loss = boostingStrategy.loss) : Array[Double] = { + + val algo = boostingStrategy.treeStrategy.algo + val remappedData = algo match { + case Classification => data.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) + case _ => data + } + val initialTree = baseLearners(0) + val evaluationArray = Array.fill(numIterations)(0.0) + + // Initial weight is 1.0 + var predictionRDD = remappedData.map(i => initialTree.predict(i.features)) + evaluationArray(0) = loss.computeError(remappedData, predictionRDD) + + (1 until numIterations).map {nTree => + predictionRDD = (remappedData zip predictionRDD) map { + case (point, pred) => + pred + baseLearners(nTree).predict(point.features) * baseLearnerWeights(nTree) + } + evaluationArray(nTree) = loss.computeError(remappedData, predictionRDD) + } + evaluationArray } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala index d1bde15e6b15..8020b5e7e3df 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala @@ -61,4 +61,23 @@ object AbsoluteError extends Loss { math.abs(err) }.mean() } + + /** + * Method to calculate loss when the predictions are already known. + * Note: This method is used in the method evaluateEachIteration to avoid recomputing the + * predicted values from previously fit trees. + * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. + * @param prediction: RDD[Double] of predicted labels. + * @return Mean absolute error of model on data + */ + override def computeError(data: RDD[LabeledPoint], prediction: RDD[Double]): Double = { + val errorAcrossSamples = (data zip prediction) map { + case (yTrue, yPred) => { + val err = yTrue.label - yPred + math.abs(err) + } + } + errorAcrossSamples.mean() + } + } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala index 55213e695638..2c0c84fdce4f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala @@ -66,4 +66,23 @@ object LogLoss extends Loss { 2.0 * MLUtils.log1pExp(-margin) }.mean() } + + /** + * Method to calculate loss when the predictions are already known. + * Note: This method is used in the method evaluateEachIteration to avoid recomputing the + * predicted values from previously fit trees. + * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. + * @param prediction: RDD[Double] of predicted labels. + * @return Mean log loss of model on data + */ + override def computeError(data: RDD[LabeledPoint], prediction: RDD[Double]): Double = { + val errorAcrossSamples = (data zip prediction) map { + case (yTrue, yPred) => + val margin = 2.0 * yTrue.label * yPred + // The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable. + 2.0 * MLUtils.log1pExp(-margin) + } + errorAcrossSamples.mean() + } + } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala index e1169d9f66ea..944a64c0fd88 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala @@ -49,4 +49,14 @@ trait Loss extends Serializable { */ def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double + /** + * Method to calculate loss when the predictions are already known. + * Note: This method is used in the method evaluateEachIteration to avoid recomputing the + * predicted values from previously fit trees. + * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. + * @param prediction: RDD[Double] of predicted labels. + * @return Measure of model error on data + */ + def computeError(data: RDD[LabeledPoint], prediction: RDD[Double]) : Double + } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala index 50ecaa2f86f3..ed566a8fbb27 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala @@ -61,4 +61,22 @@ object SquaredError extends Loss { err * err }.mean() } + + /** + * Method to calculate loss when the predictions are already known. + * Note: This method is used in the method evaluateEachIteration to avoid recomputing the + * predicted values from previously fit trees. + * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. + * @param prediction: RDD[Double] of predicted labels. + * @return Mean squared error of model on data + */ + override def computeError(data: RDD[LabeledPoint], prediction: RDD[Double]): Double = { + val errorAcrossSamples = (data zip prediction) map { + case (yTrue, yPred) => + val err = yPred - yTrue.label + err * err + } + errorAcrossSamples.mean() + } + } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala index b437aeaaf054..dc930d3e3b67 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala @@ -175,10 +175,12 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { new BoostingStrategy(treeStrategy, loss, numIterations, validationTol = 0.0) val gbtValidate = new GradientBoostedTrees(boostingStrategy) .runWithValidation(trainRdd, validateRdd) - assert(gbtValidate.numTrees !== numIterations) + val numTrees = gbtValidate.numTrees + assert(numTrees !== numIterations) // Test that it performs better on the validation dataset. - val gbt = GradientBoostedTrees.train(trainRdd, boostingStrategy) + val gbtModel = new GradientBoostedTrees(boostingStrategy) + val gbt = gbtModel.run(trainRdd) val (errorWithoutValidation, errorWithValidation) = { if (algo == Classification) { val remappedRdd = validateRdd.map(x => new LabeledPoint(2 * x.label - 1, x.features)) @@ -188,6 +190,17 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { } } assert(errorWithValidation <= errorWithoutValidation) + + // Test that results from evaluateEachIteration comply with runWithValidation. + // Note that convergenceTol is set to 0.0 + val evaluationArray = gbtModel.evaluateEachIteration(validateRdd) + assert(evaluationArray.length === numIterations) + assert(evaluationArray(numTrees) > evaluationArray(numTrees - 1)) + var i = 1 + while (i < numTrees) { + assert(evaluationArray(i) < evaluationArray(i - 1)) + i += 1 + } } } } From bc99ac68fdd81e5f0569a9cfd2d1eb97498885b6 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 11 Mar 2015 01:09:10 +0530 Subject: [PATCH 2/5] Refactor the method and stuff --- .../mllib/tree/GradientBoostedTrees.scala | 43 +--------------- .../spark/mllib/tree/loss/AbsoluteError.scala | 17 +++---- .../spark/mllib/tree/loss/LogLoss.scala | 18 +++---- .../apache/spark/mllib/tree/loss/Loss.scala | 8 +-- .../spark/mllib/tree/loss/SquaredError.scala | 16 +++--- .../mllib/tree/model/treeEnsembleModels.scala | 49 +++++++++++++++++++ .../tree/GradientBoostedTreesSuite.scala | 5 +- 7 files changed, 75 insertions(+), 81 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala index 54199eb33c79..a9c93e181e3c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala @@ -25,7 +25,6 @@ import org.apache.spark.mllib.tree.configuration.BoostingStrategy import org.apache.spark.mllib.tree.configuration.Algo._ import org.apache.spark.mllib.tree.impl.TimeTracker import org.apache.spark.mllib.tree.impurity.Variance -import org.apache.spark.mllib.tree.loss.Loss import org.apache.spark.mllib.tree.model.{DecisionTreeModel, GradientBoostedTreesModel} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -53,10 +52,6 @@ import org.apache.spark.storage.StorageLevel class GradientBoostedTrees(private val boostingStrategy: BoostingStrategy) extends Serializable with Logging { - private val numIterations = boostingStrategy.numIterations - private var baseLearners = new Array[DecisionTreeModel](numIterations) - private var baseLearnerWeights = new Array[Double](numIterations) - /** * Method to train a gradient boosting model * @param input Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. @@ -64,7 +59,7 @@ class GradientBoostedTrees(private val boostingStrategy: BoostingStrategy) */ def run(input: RDD[LabeledPoint]): GradientBoostedTreesModel = { val algo = boostingStrategy.treeStrategy.algo - val fitGradientBoostingModel = algo match { + algo match { case Regression => GradientBoostedTrees.boost(input, input, boostingStrategy, validate=false) case Classification => // Map labels to -1, +1 so binary classification can be treated as regression. @@ -74,42 +69,6 @@ class GradientBoostedTrees(private val boostingStrategy: BoostingStrategy) case _ => throw new IllegalArgumentException(s"$algo is not supported by the gradient boosting.") } - baseLearners = fitGradientBoostingModel.trees - baseLearnerWeights = fitGradientBoostingModel.treeWeights - fitGradientBoostingModel - } - - /** - * Method to compute error or loss for every iteration of gradient boosting. - * @param data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] - * @param loss: evaluation metric that defaults to boostingStrategy.loss - * @return an array with index i having the losses or errors for the ensemble - * containing trees 1 to i + 1 - */ - def evaluateEachIteration( - data: RDD[LabeledPoint], - loss: Loss = boostingStrategy.loss) : Array[Double] = { - - val algo = boostingStrategy.treeStrategy.algo - val remappedData = algo match { - case Classification => data.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) - case _ => data - } - val initialTree = baseLearners(0) - val evaluationArray = Array.fill(numIterations)(0.0) - - // Initial weight is 1.0 - var predictionRDD = remappedData.map(i => initialTree.predict(i.features)) - evaluationArray(0) = loss.computeError(remappedData, predictionRDD) - - (1 until numIterations).map {nTree => - predictionRDD = (remappedData zip predictionRDD) map { - case (point, pred) => - pred + baseLearners(nTree).predict(point.features) * baseLearnerWeights(nTree) - } - evaluationArray(nTree) = loss.computeError(remappedData, predictionRDD) - } - evaluationArray } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala index 8020b5e7e3df..f69960ee3714 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala @@ -66,18 +66,13 @@ object AbsoluteError extends Loss { * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. - * @param prediction: RDD[Double] of predicted labels. - * @return Mean absolute error of model on data + * @param datum: LabeledPoint + * @param prediction: Predicted label. + * @return Absolute error of model on the given datapoint. */ - override def computeError(data: RDD[LabeledPoint], prediction: RDD[Double]): Double = { - val errorAcrossSamples = (data zip prediction) map { - case (yTrue, yPred) => { - val err = yTrue.label - yPred - math.abs(err) - } - } - errorAcrossSamples.mean() + override def computeError(datum: LabeledPoint, prediction: Double): Double = { + val err = datum.label - prediction + math.abs(err) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala index 2c0c84fdce4f..06ec62ea8252 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala @@ -71,18 +71,14 @@ object LogLoss extends Loss { * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. - * @param prediction: RDD[Double] of predicted labels. - * @return Mean log loss of model on data + * @param datum: LabeledPoint + * @param prediction: Predicted label. + * @return log loss of model on the datapoint. */ - override def computeError(data: RDD[LabeledPoint], prediction: RDD[Double]): Double = { - val errorAcrossSamples = (data zip prediction) map { - case (yTrue, yPred) => - val margin = 2.0 * yTrue.label * yPred - // The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable. - 2.0 * MLUtils.log1pExp(-margin) - } - errorAcrossSamples.mean() + override def computeError(datum: LabeledPoint, prediction: Double): Double = { + val margin = 2.0 * datum.label * prediction + // The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable. + 2.0 * MLUtils.log1pExp(-margin) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala index 944a64c0fd88..62d9c86bb14a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala @@ -53,10 +53,10 @@ trait Loss extends Serializable { * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. - * @param prediction: RDD[Double] of predicted labels. - * @return Measure of model error on data + * @param datum: LabeledPoint + * @param prediction: Predicted label. + * @return Measure of model error on datapoint. */ - def computeError(data: RDD[LabeledPoint], prediction: RDD[Double]) : Double + def computeError(datum: LabeledPoint, prediction: Double) : Double } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala index ed566a8fbb27..0ff8718f0d54 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala @@ -66,17 +66,13 @@ object SquaredError extends Loss { * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. - * @param prediction: RDD[Double] of predicted labels. - * @return Mean squared error of model on data + * @param datum: LabeledPoint + * @param prediction: Predicted label. + * @return Mean squared error of model on datapoint. */ - override def computeError(data: RDD[LabeledPoint], prediction: RDD[Double]): Double = { - val errorAcrossSamples = (data zip prediction) map { - case (yTrue, yPred) => - val err = yPred - yTrue.label - err * err - } - errorAcrossSamples.mean() + override def computeError(datum: LabeledPoint, prediction: Double): Double = { + val err = prediction - datum.label + err * err } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index 30a8f7ca301a..b4dc0e739829 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -28,9 +28,11 @@ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.Algo import org.apache.spark.mllib.tree.configuration.Algo._ import org.apache.spark.mllib.tree.configuration.EnsembleCombiningStrategy._ +import org.apache.spark.mllib.tree.loss.Loss import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext @@ -108,6 +110,53 @@ class GradientBoostedTreesModel( } override protected def formatVersion: String = TreeEnsembleModel.SaveLoadV1_0.thisFormatVersion + + /** + * Method to compute error or loss for every iteration of gradient boosting. + * @param data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @param loss: evaluation metric. + * @return an array with index i having the losses or errors for the ensemble + * containing trees 1 to i + 1 + */ + def evaluateEachIteration( + data: RDD[LabeledPoint], + loss: Loss) : Array[Double] = { + + val sc = data.sparkContext + val remappedData = algo match { + case Classification => data.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) + case _ => data + } + val initialTree = trees(0) + val numIterations = trees.length + val evaluationArray = Array.fill(numIterations)(0.0) + + // Initial weight is 1.0 + var predictionErrorModel = remappedData.map {i => + val pred = initialTree.predict(i.features) + val error = loss.computeError(i, pred) + (pred, error) + } + evaluationArray(0) = predictionErrorModel.values.mean() + + // Avoid the model being copied across numIterations. + val broadcastTrees = sc.broadcast(trees) + val broadcastWeights = sc.broadcast(treeWeights) + + (1 until numIterations).map {nTree => + predictionErrorModel = (remappedData zip predictionErrorModel) map { + case (point, (pred, error)) => { + val newPred = pred + ( + broadcastTrees.value(nTree).predict(point.features) * broadcastWeights.value(nTree)) + val newError = loss.computeError(point, newPred) + (newPred, newError) + } + } + evaluationArray(nTree) = predictionErrorModel.values.mean() + } + evaluationArray + } + } object GradientBoostedTreesModel extends Loader[GradientBoostedTreesModel] { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala index dc930d3e3b67..2d90764eeb79 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala @@ -179,8 +179,7 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { assert(numTrees !== numIterations) // Test that it performs better on the validation dataset. - val gbtModel = new GradientBoostedTrees(boostingStrategy) - val gbt = gbtModel.run(trainRdd) + val gbt = new GradientBoostedTrees(boostingStrategy).run(trainRdd) val (errorWithoutValidation, errorWithValidation) = { if (algo == Classification) { val remappedRdd = validateRdd.map(x => new LabeledPoint(2 * x.label - 1, x.features)) @@ -193,7 +192,7 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { // Test that results from evaluateEachIteration comply with runWithValidation. // Note that convergenceTol is set to 0.0 - val evaluationArray = gbtModel.evaluateEachIteration(validateRdd) + val evaluationArray = gbt.evaluateEachIteration(validateRdd, loss) assert(evaluationArray.length === numIterations) assert(evaluationArray(numTrees) > evaluationArray(numTrees - 1)) var i = 1 From 6e8aa106ff95b446b308996edca98cad8f410813 Mon Sep 17 00:00:00 2001 From: MechCoder Date: Wed, 11 Mar 2015 13:43:32 +0530 Subject: [PATCH 3/5] Made the following changes Used mapPartition instead of map Refactored computeError and unpersisted broadcast variables --- .../spark/mllib/tree/loss/AbsoluteError.scala | 23 ++--------- .../spark/mllib/tree/loss/LogLoss.scala | 23 ++--------- .../apache/spark/mllib/tree/loss/Loss.scala | 10 +++-- .../spark/mllib/tree/loss/SquaredError.scala | 23 ++--------- .../mllib/tree/model/treeEnsembleModels.scala | 39 +++++++++++-------- .../tree/GradientBoostedTreesSuite.scala | 2 +- 6 files changed, 40 insertions(+), 80 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala index f69960ee3714..4bed11a80c7c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala @@ -47,30 +47,15 @@ object AbsoluteError extends Loss { if ((point.label - model.predict(point.features)) < 0) 1.0 else -1.0 } - /** - * Method to calculate loss of the base learner for the gradient boosting calculation. - * Note: This method is not used by the gradient boosting algorithm but is useful for debugging - * purposes. - * @param model Ensemble model - * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. - * @return Mean absolute error of model on data - */ - override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = { - data.map { y => - val err = model.predict(y.features) - y.label - math.abs(err) - }.mean() - } - /** * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param datum: LabeledPoint - * @param prediction: Predicted label. - * @return Absolute error of model on the given datapoint. + * @param prediction Predicted label. + * @param datum LabeledPoint. + * @return Absolute error of model on the given datapoint. */ - override def computeError(datum: LabeledPoint, prediction: Double): Double = { + override def computeError(prediction: Double, datum: LabeledPoint): Double = { val err = datum.label - prediction math.abs(err) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala index 06ec62ea8252..76af56bd954f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala @@ -50,32 +50,15 @@ object LogLoss extends Loss { - 4.0 * point.label / (1.0 + math.exp(2.0 * point.label * prediction)) } - /** - * Method to calculate loss of the base learner for the gradient boosting calculation. - * Note: This method is not used by the gradient boosting algorithm but is useful for debugging - * purposes. - * @param model Ensemble model - * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. - * @return Mean log loss of model on data - */ - override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = { - data.map { case point => - val prediction = model.predict(point.features) - val margin = 2.0 * point.label * prediction - // The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable. - 2.0 * MLUtils.log1pExp(-margin) - }.mean() - } - /** * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param datum: LabeledPoint - * @param prediction: Predicted label. + * @param prediction Predicted label. + * @param datum LabeledPoint * @return log loss of model on the datapoint. */ - override def computeError(datum: LabeledPoint, prediction: Double): Double = { + override def computeError(prediction: Double, datum: LabeledPoint): Double = { val margin = 2.0 * datum.label * prediction // The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable. 2.0 * MLUtils.log1pExp(-margin) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala index 62d9c86bb14a..7bb0a51e2c78 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala @@ -47,16 +47,18 @@ trait Loss extends Serializable { * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. * @return Measure of model error on data */ - def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double + def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = { + data.map(point => computeError(model.predict(point.features), point)).mean() + } /** * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param datum: LabeledPoint - * @param prediction: Predicted label. + * @param prediction Predicted label. + * @param datum LabeledPoint * @return Measure of model error on datapoint. */ - def computeError(datum: LabeledPoint, prediction: Double) : Double + def computeError(prediction: Double, datum: LabeledPoint): Double } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala index 0ff8718f0d54..cfe548b1d002 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala @@ -47,30 +47,15 @@ object SquaredError extends Loss { 2.0 * (model.predict(point.features) - point.label) } - /** - * Method to calculate loss of the base learner for the gradient boosting calculation. - * Note: This method is not used by the gradient boosting algorithm but is useful for debugging - * purposes. - * @param model Ensemble model - * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. - * @return Mean squared error of model on data - */ - override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = { - data.map { y => - val err = model.predict(y.features) - y.label - err * err - }.mean() - } - /** * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param datum: LabeledPoint - * @param prediction: Predicted label. - * @return Mean squared error of model on datapoint. + * @param prediction Predicted label. + * @param datum LabeledPoint + * @return Mean squared error of model on datapoint. */ - override def computeError(datum: LabeledPoint, prediction: Double): Double = { + override def computeError(prediction: Double, datum: LabeledPoint): Double = { val err = prediction - datum.label err * err } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index b4dc0e739829..1eee8e96f797 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -113,47 +113,52 @@ class GradientBoostedTreesModel( /** * Method to compute error or loss for every iteration of gradient boosting. - * @param data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] - * @param loss: evaluation metric. + * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @param loss evaluation metric. * @return an array with index i having the losses or errors for the ensemble * containing trees 1 to i + 1 */ def evaluateEachIteration( data: RDD[LabeledPoint], - loss: Loss) : Array[Double] = { + loss: Loss): Array[Double] = { val sc = data.sparkContext val remappedData = algo match { case Classification => data.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) case _ => data } - val initialTree = trees(0) + val numIterations = trees.length val evaluationArray = Array.fill(numIterations)(0.0) - // Initial weight is 1.0 - var predictionErrorModel = remappedData.map {i => - val pred = initialTree.predict(i.features) - val error = loss.computeError(i, pred) + var predictionAndError: RDD[(Double, Double)] = remappedData.map { i => + val pred = treeWeights(0) * trees(0).predict(i.features) + val error = loss.computeError(pred, i) (pred, error) } - evaluationArray(0) = predictionErrorModel.values.mean() + evaluationArray(0) = predictionAndError.values.mean() // Avoid the model being copied across numIterations. val broadcastTrees = sc.broadcast(trees) val broadcastWeights = sc.broadcast(treeWeights) - (1 until numIterations).map {nTree => - predictionErrorModel = (remappedData zip predictionErrorModel) map { - case (point, (pred, error)) => { - val newPred = pred + ( - broadcastTrees.value(nTree).predict(point.features) * broadcastWeights.value(nTree)) - val newError = loss.computeError(point, newPred) - (newPred, newError) + (1 until numIterations).map { nTree => + val currentTree = broadcastTrees.value(nTree) + val currentTreeWeight = broadcastWeights.value(nTree) + predictionAndError = remappedData.zip(predictionAndError).mapPartitions { iter => + iter map { + case (point, (pred, error)) => { + val newPred = pred + currentTree.predict(point.features) * currentTreeWeight + val newError = loss.computeError(newPred, point) + (newPred, newError) + } } } - evaluationArray(nTree) = predictionErrorModel.values.mean() + evaluationArray(nTree) = predictionAndError.values.mean() } + + broadcastTrees.unpersist() + broadcastWeights.unpersist() evaluationArray } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala index 2d90764eeb79..55b0bac7d49f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala @@ -197,7 +197,7 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { assert(evaluationArray(numTrees) > evaluationArray(numTrees - 1)) var i = 1 while (i < numTrees) { - assert(evaluationArray(i) < evaluationArray(i - 1)) + assert(evaluationArray(i) <= evaluationArray(i - 1)) i += 1 } } From 352001fc81912e273159580225b3db2a1cac06aa Mon Sep 17 00:00:00 2001 From: MechCoder Date: Sat, 14 Mar 2015 23:41:02 +0530 Subject: [PATCH 4/5] Minor --- .../apache/spark/mllib/tree/loss/AbsoluteError.scala | 12 ++---------- .../org/apache/spark/mllib/tree/loss/LogLoss.scala | 12 ++---------- .../org/apache/spark/mllib/tree/loss/Loss.scala | 6 +++--- .../apache/spark/mllib/tree/loss/SquaredError.scala | 12 ++---------- .../spark/mllib/tree/model/treeEnsembleModels.scala | 10 +++++----- 5 files changed, 14 insertions(+), 38 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala index 4bed11a80c7c..793dd664c5d5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala @@ -47,16 +47,8 @@ object AbsoluteError extends Loss { if ((point.label - model.predict(point.features)) < 0) 1.0 else -1.0 } - /** - * Method to calculate loss when the predictions are already known. - * Note: This method is used in the method evaluateEachIteration to avoid recomputing the - * predicted values from previously fit trees. - * @param prediction Predicted label. - * @param datum LabeledPoint. - * @return Absolute error of model on the given datapoint. - */ - override def computeError(prediction: Double, datum: LabeledPoint): Double = { - val err = datum.label - prediction + override def computeError(prediction: Double, label: Double): Double = { + val err = label - prediction math.abs(err) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala index 76af56bd954f..51b1aed167b6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala @@ -50,16 +50,8 @@ object LogLoss extends Loss { - 4.0 * point.label / (1.0 + math.exp(2.0 * point.label * prediction)) } - /** - * Method to calculate loss when the predictions are already known. - * Note: This method is used in the method evaluateEachIteration to avoid recomputing the - * predicted values from previously fit trees. - * @param prediction Predicted label. - * @param datum LabeledPoint - * @return log loss of model on the datapoint. - */ - override def computeError(prediction: Double, datum: LabeledPoint): Double = { - val margin = 2.0 * datum.label * prediction + override def computeError(prediction: Double, label: Double): Double = { + val margin = 2.0 * label * prediction // The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable. 2.0 * MLUtils.log1pExp(-margin) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala index 7bb0a51e2c78..357869ff6b33 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala @@ -48,7 +48,7 @@ trait Loss extends Serializable { * @return Measure of model error on data */ def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = { - data.map(point => computeError(model.predict(point.features), point)).mean() + data.map(point => computeError(model.predict(point.features), point.label)).mean() } /** @@ -56,9 +56,9 @@ trait Loss extends Serializable { * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. * @param prediction Predicted label. - * @param datum LabeledPoint + * @param label True label. * @return Measure of model error on datapoint. */ - def computeError(prediction: Double, datum: LabeledPoint): Double + def computeError(prediction: Double, label: Double): Double } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala index cfe548b1d002..b990707ca452 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala @@ -47,16 +47,8 @@ object SquaredError extends Loss { 2.0 * (model.predict(point.features) - point.label) } - /** - * Method to calculate loss when the predictions are already known. - * Note: This method is used in the method evaluateEachIteration to avoid recomputing the - * predicted values from previously fit trees. - * @param prediction Predicted label. - * @param datum LabeledPoint - * @return Mean squared error of model on datapoint. - */ - override def computeError(prediction: Double, datum: LabeledPoint): Double = { - val err = prediction - datum.label + override def computeError(prediction: Double, label: Double): Double = { + val err = prediction - label err * err } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index 1eee8e96f797..0e55e2418407 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -133,7 +133,7 @@ class GradientBoostedTreesModel( var predictionAndError: RDD[(Double, Double)] = remappedData.map { i => val pred = treeWeights(0) * trees(0).predict(i.features) - val error = loss.computeError(pred, i) + val error = loss.computeError(pred, i.label) (pred, error) } evaluationArray(0) = predictionAndError.values.mean() @@ -143,13 +143,13 @@ class GradientBoostedTreesModel( val broadcastWeights = sc.broadcast(treeWeights) (1 until numIterations).map { nTree => - val currentTree = broadcastTrees.value(nTree) - val currentTreeWeight = broadcastWeights.value(nTree) predictionAndError = remappedData.zip(predictionAndError).mapPartitions { iter => - iter map { + val currentTree = broadcastTrees.value(nTree) + val currentTreeWeight = broadcastWeights.value(nTree) + iter.map { case (point, (pred, error)) => { val newPred = pred + currentTree.predict(point.features) * currentTreeWeight - val newError = loss.computeError(newPred, point) + val newError = loss.computeError(newPred, point.label) (newPred, newError) } } From 67146abf82938425b33d52238ef0e5e72ed8ca6e Mon Sep 17 00:00:00 2001 From: MechCoder Date: Tue, 17 Mar 2015 11:42:38 +0530 Subject: [PATCH 5/5] Minor --- .../org/apache/spark/mllib/tree/model/treeEnsembleModels.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index 0e55e2418407..b81d05515850 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -116,7 +116,7 @@ class GradientBoostedTreesModel( * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] * @param loss evaluation metric. * @return an array with index i having the losses or errors for the ensemble - * containing trees 1 to i + 1 + * containing the first i+1 trees */ def evaluateEachIteration( data: RDD[LabeledPoint],