Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/mllib-ensembles.md
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,8 @@ first one being the training dataset and the second being the validation dataset
The training is stopped when the improvement in the validation error is not more than a certain tolerance
(supplied by the `validationTol` argument in `BoostingStrategy`). In practice, the validation error
decreases initially and later increases. There might be cases in which the validation error does not change monotonically,
and the user is advised to set a large enough negative tolerance and examine the validation curve to to tune the number of
iterations.
and the user is advised to set a large enough negative tolerance and examine the validation curve using `evaluateEachIteration`
(which gives the error or loss per iteration) to tune the number of iterations.

### Examples

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,18 @@ object AbsoluteError extends Loss {
math.abs(err)
}.mean()
}

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for doc; it will be inherited from the overridden method (here and in other 2 loss classes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the doc for the return is different, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK for the doc for gradient() and computeError() to be generic as long as the doc for the loss classes describes the specific loss function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so should I remove it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please

* Method to calculate loss when the predictions are already known.
* Note: This method is used in the method evaluateEachIteration to avoid recomputing the
* predicted values from previously fit trees.
* @param datum: LabeledPoint
* @param prediction: Predicted label.
* @return Absolute error of model on the given datapoint.
*/
override def computeError(datum: LabeledPoint, prediction: Double): Double = {
val err = datum.label - prediction
math.abs(err)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,19 @@ object LogLoss extends Loss {
2.0 * MLUtils.log1pExp(-margin)
}.mean()
}

/**
* Method to calculate loss when the predictions are already known.
* Note: This method is used in the method evaluateEachIteration to avoid recomputing the
* predicted values from previously fit trees.
* @param datum: LabeledPoint
* @param prediction: Predicted label.
* @return log loss of model on the datapoint.
*/
override def computeError(datum: LabeledPoint, prediction: Double): Double = {
val margin = 2.0 * datum.label * prediction
// The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable.
2.0 * MLUtils.log1pExp(-margin)
}

}
10 changes: 10 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,14 @@ trait Loss extends Serializable {
*/
def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide default implementation using other computeError, and then remove overridden copies from child classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, It gives mental satisfaction on removing huge blocks of code :P

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: )


/**
* Method to calculate loss when the predictions are already known.
* Note: This method is used in the method evaluateEachIteration to avoid recomputing the
* predicted values from previously fit trees.
* @param datum: LabeledPoint
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no colon after param name (here and elsewhere)

* @param prediction: Predicted label.
* @return Measure of model error on datapoint.
*/
def computeError(datum: LabeledPoint, prediction: Double) : Double
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch arg order to match batch computeError more closely

no space before colon (here and elsewhere)


}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,18 @@ object SquaredError extends Loss {
err * err
}.mean()
}

/**
* Method to calculate loss when the predictions are already known.
* Note: This method is used in the method evaluateEachIteration to avoid recomputing the
* predicted values from previously fit trees.
* @param datum: LabeledPoint
* @param prediction: Predicted label.
* @return Mean squared error of model on datapoint.
*/
override def computeError(datum: LabeledPoint, prediction: Double): Double = {
val err = prediction - datum.label
err * err
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.Algo
import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.configuration.EnsembleCombiningStrategy._
import org.apache.spark.mllib.tree.loss.Loss
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
Expand Down Expand Up @@ -108,6 +110,53 @@ class GradientBoostedTreesModel(
}

override protected def formatVersion: String = TreeEnsembleModel.SaveLoadV1_0.thisFormatVersion

/**
* Method to compute error or loss for every iteration of gradient boosting.
* @param data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]
* @param loss: evaluation metric.
* @return an array with index i having the losses or errors for the ensemble
* containing trees 1 to i + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Use 0-based indexing for doc: "containing trees 0 to i"
Or just say "containing the first i+1 trees"

*/
def evaluateEachIteration(
data: RDD[LabeledPoint],
loss: Loss) : Array[Double] = {

val sc = data.sparkContext
val remappedData = algo match {
case Classification => data.map(x => new LabeledPoint((x.label * 2) - 1, x.features))
case _ => data
}
val initialTree = trees(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove? only used once

val numIterations = trees.length
val evaluationArray = Array.fill(numIterations)(0.0)

// Initial weight is 1.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may as well use initial weight explicitly in case that changes for some reason in the future

var predictionErrorModel = remappedData.map {i =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

predictionErrorModel is an odd name (model?). I'd rename it to predictionAndError and possibly add an explicit type for clarity.

val pred = initialTree.predict(i.features)
val error = loss.computeError(i, pred)
(pred, error)
}
evaluationArray(0) = predictionErrorModel.values.mean()

// Avoid the model being copied across numIterations.
val broadcastTrees = sc.broadcast(trees)
val broadcastWeights = sc.broadcast(treeWeights)

(1 until numIterations).map {nTree =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after {

predictionErrorModel = (remappedData zip predictionErrorModel) map {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use mapPartitions. Before iterating over the partition elements, extract the trees and weights from the broadcast variables. I believe that reduces overhead a little.

Also, try to avoid infix notation since non-Scala people may not be used to it:

remappedData.zip(predictionErrorModel)

case (point, (pred, error)) => {
val newPred = pred + (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: parenthesis on next line

broadcastTrees.value(nTree).predict(point.features) * broadcastWeights.value(nTree))
val newError = loss.computeError(point, newPred)
(newPred, newError)
}
}
evaluationArray(nTree) = predictionErrorModel.values.mean()
}
evaluationArray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to explicitly unpersist the broadcast values before returning. They will get unpersisted once their values go out of scope, but it might take longer.

}

}

object GradientBoostedTreesModel extends Loader[GradientBoostedTreesModel] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,11 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
new BoostingStrategy(treeStrategy, loss, numIterations, validationTol = 0.0)
val gbtValidate = new GradientBoostedTrees(boostingStrategy)
.runWithValidation(trainRdd, validateRdd)
assert(gbtValidate.numTrees !== numIterations)
val numTrees = gbtValidate.numTrees
assert(numTrees !== numIterations)

// Test that it performs better on the validation dataset.
val gbt = GradientBoostedTrees.train(trainRdd, boostingStrategy)
val gbt = new GradientBoostedTrees(boostingStrategy).run(trainRdd)
val (errorWithoutValidation, errorWithValidation) = {
if (algo == Classification) {
val remappedRdd = validateRdd.map(x => new LabeledPoint(2 * x.label - 1, x.features))
Expand All @@ -188,6 +189,17 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext {
}
}
assert(errorWithValidation <= errorWithoutValidation)

// Test that results from evaluateEachIteration comply with runWithValidation.
// Note that convergenceTol is set to 0.0
val evaluationArray = gbt.evaluateEachIteration(validateRdd, loss)
assert(evaluationArray.length === numIterations)
assert(evaluationArray(numTrees) > evaluationArray(numTrees - 1))
var i = 1
while (i < numTrees) {
assert(evaluationArray(i) < evaluationArray(i - 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small issue: < should be <=

i += 1
}
}
}
}
Expand Down