From 5650e98a580544303dc1185568be992d9304707a Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 31 Jan 2017 15:01:07 -0800 Subject: [PATCH 01/17] Changed CrossValidator and TrainValidationSplit fit methods to evaluate models in paralell --- .../spark/ml/tuning/CrossValidator.scala | 43 +++++++++++++------ .../ml/tuning/TrainValidationSplit.scala | 33 ++++++++------ .../spark/ml/tuning/ValidatorParams.scala | 13 +++++- 3 files changed, 62 insertions(+), 27 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 2012d6ca8b5e..e911eec92be0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -51,7 +51,7 @@ private[ml] trait CrossValidatorParams extends ValidatorParams { /** @group getParam */ def getNumFolds: Int = $(numFolds) - setDefault(numFolds -> 3) + setDefault(numFolds -> 3, numParallelEval -> 1) } /** @@ -91,6 +91,10 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) @Since("2.0.0") def setSeed(value: Long): this.type = set(seed, value) + /** @group setParam */ + @Since("2.2.0") + def setNumParallelEval(value: Int): this.type = set(numParallelEval, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): CrossValidatorModel = { val schema = dataset.schema @@ -100,31 +104,44 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val eval = $(evaluator) val epm = $(estimatorParamMaps) val numModels = epm.length - val metrics = new Array[Double](epm.length) + val numPar = $(numParallelEval) val instr = Instrumentation.create(this, dataset) instr.logParams(numFolds, seed) logTuningParams(instr) + // Compute metrics for each model over each fold + logDebug(s"Running cross-validation with level of parallelism: $numPar.") val splits = MLUtils.kFold(dataset.toDF.rdd, $(numFolds), $(seed)) - splits.zipWithIndex.foreach { case ((training, validation), splitIndex) => + val metrics = splits.zipWithIndex.map { case ((training, validation), splitIndex) => val trainingDataset = sparkSession.createDataFrame(training, schema).cache() val validationDataset = sparkSession.createDataFrame(validation, schema).cache() // multi-model training logDebug(s"Train split $splitIndex with multiple sets of parameters.") - val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]] + + // Fit models concurrently, limited by using a sliding window over models + val models = epm.grouped(numPar).map { win => + win.par.map(est.fit(trainingDataset, _)) + }.toList.flatten.asInstanceOf[Seq[Model[_]]] trainingDataset.unpersist() - var i = 0 - while (i < numModels) { - // TODO: duplicate evaluator to take extra params from input - val metric = eval.evaluate(models(i).transform(validationDataset, epm(i))) - logDebug(s"Got metric $metric for model trained with ${epm(i)}.") - metrics(i) += metric - i += 1 - } + + // Evaluate models concurrently, limited by using a sliding window over models + val foldMetrics = models.zip(epm).grouped(numPar).map { win => + win.par.map { m => + // TODO: duplicate evaluator to take extra params from input + val metric = eval.evaluate(m._1.transform(validationDataset, m._2)) + logDebug(s"Got metric $metric for model trained with ${m._2}.") + metric + } + }.toList.flatten + validationDataset.unpersist() - } + foldMetrics + }.reduce((mA, mB) => mA.zip(mB).map(m => m._1 + m._2)).toArray + + // Calculate average metric for all folds f2jBLAS.dscal(numModels, 1.0 / $(numFolds), metrics, 1) + logInfo(s"Average cross-validation metrics: ${metrics.toSeq}") val (bestMetric, bestIndex) = if (eval.isLargerBetter) metrics.zipWithIndex.maxBy(_._1) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index db7c9d13d301..05d8a6d35090 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -50,7 +50,7 @@ private[ml] trait TrainValidationSplitParams extends ValidatorParams { /** @group getParam */ def getTrainRatio: Double = $(trainRatio) - setDefault(trainRatio -> 0.75) + setDefault(trainRatio -> 0.75, numParallelEval -> 1) } /** @@ -87,6 +87,10 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St @Since("2.0.0") def setSeed(value: Long): this.type = set(seed, value) + /** @group setParam */ + @Since("2.2.0") + def setNumParallelEval(value: Int): this.type = set(numParallelEval, value) + @Since("2.0.0") override def fit(dataset: Dataset[_]): TrainValidationSplitModel = { val schema = dataset.schema @@ -94,8 +98,8 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St val est = $(estimator) val eval = $(evaluator) val epm = $(estimatorParamMaps) - val numModels = epm.length - val metrics = new Array[Double](epm.length) + val numPar = $(numParallelEval) + logDebug(s"Running validation with level of parallelism: $numPar.") val instr = Instrumentation.create(this, dataset) instr.logParams(trainRatio, seed) @@ -106,18 +110,21 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St trainingDataset.cache() validationDataset.cache() - // multi-model training logDebug(s"Train split with multiple sets of parameters.") - val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]] + // Fit models concurrently, limited by using a sliding window over models + val models = epm.grouped(numPar).map { win => + win.par.map(est.fit(trainingDataset, _)) + }.toList.flatten.asInstanceOf[Seq[Model[_]]] trainingDataset.unpersist() - var i = 0 - while (i < numModels) { - // TODO: duplicate evaluator to take extra params from input - val metric = eval.evaluate(models(i).transform(validationDataset, epm(i))) - logDebug(s"Got metric $metric for model trained with ${epm(i)}.") - metrics(i) += metric - i += 1 - } + // Evaluate models concurrently, limited by using a sliding window over models + val metrics = models.zip(epm).grouped(numPar).map { win => + win.par.map { m => + // TODO: duplicate evaluator to take extra params from input + val metric = eval.evaluate(m._1.transform(validationDataset, m._2)) + logDebug(s"Got metric $metric for model trained with ${m._2}.") + metric + } + }.toList.flatten.toArray validationDataset.unpersist() logInfo(s"Train validation split metrics: ${metrics.toSeq}") diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index d55eb14d0345..1279d4afeb57 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -24,7 +24,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.evaluation.Evaluator -import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} +import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamPair, Params} import org.apache.spark.ml.param.shared.HasSeed import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata @@ -67,6 +67,17 @@ private[ml] trait ValidatorParams extends HasSeed with Params { /** @group getParam */ def getEvaluator: Evaluator = $(evaluator) + /** + * param to control the number of models evaluated in parallel + * + * @group param + */ + val numParallelEval: IntParam = new IntParam(this, "numParallelEval", + "max number of models to evaluate in parallel, 1 for serial evaluation") + + /** @group getParam */ + def getNumParallelEval: Int = $(numParallelEval) + protected def transformSchemaImpl(schema: StructType): StructType = { require($(estimatorParamMaps).nonEmpty, s"Validator requires non-empty estimatorParamMaps") val firstEstimatorParamMap = $(estimatorParamMaps).head From 36a1a68055756707b5ebdc117aacd4d713cf7dae Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 14 Feb 2017 10:53:59 -0800 Subject: [PATCH 02/17] made closure vars more explicit, moved param default to trait --- .../apache/spark/ml/tuning/CrossValidator.scala | 16 +++++++++------- .../spark/ml/tuning/TrainValidationSplit.scala | 16 +++++++++------- .../apache/spark/ml/tuning/ValidatorParams.scala | 3 +++ 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index e911eec92be0..f9bd5c6df488 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -51,7 +51,7 @@ private[ml] trait CrossValidatorParams extends ValidatorParams { /** @group getParam */ def getNumFolds: Int = $(numFolds) - setDefault(numFolds -> 3, numParallelEval -> 1) + setDefault(numFolds -> 3) } /** @@ -119,18 +119,20 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) // multi-model training logDebug(s"Train split $splitIndex with multiple sets of parameters.") - // Fit models concurrently, limited by using a sliding window over models + // Fit models concurrently, limited by a sliding window of size 'numPar' over estimator params val models = epm.grouped(numPar).map { win => - win.par.map(est.fit(trainingDataset, _)) + win.par.map { paramMap => + est.fit(trainingDataset, paramMap) + } }.toList.flatten.asInstanceOf[Seq[Model[_]]] trainingDataset.unpersist() - // Evaluate models concurrently, limited by using a sliding window over models + // Evaluate models concurrently, limited by a sliding window of size 'numPar' over models val foldMetrics = models.zip(epm).grouped(numPar).map { win => - win.par.map { m => + win.par.map { case (model, paramMap) => // TODO: duplicate evaluator to take extra params from input - val metric = eval.evaluate(m._1.transform(validationDataset, m._2)) - logDebug(s"Got metric $metric for model trained with ${m._2}.") + val metric = eval.evaluate(model.transform(validationDataset, paramMap)) + logDebug(s"Got metric $metric for model trained with $paramMap.") metric } }.toList.flatten diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index 05d8a6d35090..dd5529686ecf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -50,7 +50,7 @@ private[ml] trait TrainValidationSplitParams extends ValidatorParams { /** @group getParam */ def getTrainRatio: Double = $(trainRatio) - setDefault(trainRatio -> 0.75, numParallelEval -> 1) + setDefault(trainRatio -> 0.75) } /** @@ -111,17 +111,19 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St validationDataset.cache() logDebug(s"Train split with multiple sets of parameters.") - // Fit models concurrently, limited by using a sliding window over models + // Fit models concurrently, limited by a sliding window of size 'numPar' over estimator params val models = epm.grouped(numPar).map { win => - win.par.map(est.fit(trainingDataset, _)) + win.par.map { paramMap => + est.fit(trainingDataset, paramMap) + } }.toList.flatten.asInstanceOf[Seq[Model[_]]] trainingDataset.unpersist() - // Evaluate models concurrently, limited by using a sliding window over models + // Evaluate models concurrently, limited by a sliding window of size 'numPar' over models val metrics = models.zip(epm).grouped(numPar).map { win => - win.par.map { m => + win.par.map { case (model, paramMap) => // TODO: duplicate evaluator to take extra params from input - val metric = eval.evaluate(m._1.transform(validationDataset, m._2)) - logDebug(s"Got metric $metric for model trained with ${m._2}.") + val metric = eval.evaluate(model.transform(validationDataset, paramMap)) + logDebug(s"Got metric $metric for model trained with $paramMap.") metric } }.toList.flatten.toArray diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index 1279d4afeb57..93106f368c53 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -69,6 +69,7 @@ private[ml] trait ValidatorParams extends HasSeed with Params { /** * param to control the number of models evaluated in parallel + * Default: 1 * * @group param */ @@ -96,6 +97,8 @@ private[ml] trait ValidatorParams extends HasSeed with Params { instrumentation.logNamedValue("evaluator", $(evaluator).getClass.getCanonicalName) instrumentation.logNamedValue("estimatorParamMapsLength", $(estimatorParamMaps).length) } + + setDefault(numParallelEval -> 1) } private[ml] object ValidatorParams { From b051afad0291d6f07aa3a7594b27d1958450cdaa Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 14 Feb 2017 10:55:40 -0800 Subject: [PATCH 03/17] added paramvalidator for numParallelEval to ensure >=1 --- .../scala/org/apache/spark/ml/tuning/ValidatorParams.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index 93106f368c53..404fadc1532e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -24,7 +24,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.evaluation.Evaluator -import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamPair, Params} +import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.HasSeed import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata @@ -74,7 +74,8 @@ private[ml] trait ValidatorParams extends HasSeed with Params { * @group param */ val numParallelEval: IntParam = new IntParam(this, "numParallelEval", - "max number of models to evaluate in parallel, 1 for serial evaluation") + "max number of models to evaluate in parallel, 1 for serial evaluation", + ParamValidators.gtEq(1)) /** @group getParam */ def getNumParallelEval: Int = $(numParallelEval) From 46fe252f6bdf57c4b9369645c66160e000815644 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 15 Feb 2017 14:55:05 -0800 Subject: [PATCH 04/17] added test cases for CrossValidation and TrainValidationSplit --- .../spark/ml/tuning/CrossValidatorSuite.scala | 27 ++++++++++++++ .../ml/tuning/TrainValidationSplitSuite.scala | 35 +++++++++++++++++-- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index 7116265474f2..abf583ddc121 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -121,6 +121,33 @@ class CrossValidatorSuite } } + test("cross validation with parallel evaluation") { + val lr = new LogisticRegression + val lrParamMaps = new ParamGridBuilder() + .addGrid(lr.regParam, Array(0.001, 1000.0)) + .addGrid(lr.maxIter, Array(0, 3)) + .build() + val eval = new BinaryClassificationEvaluator + val cv = new CrossValidator() + .setEstimator(lr) + .setEstimatorParamMaps(lrParamMaps) + .setEvaluator(eval) + .setNumFolds(2) + .setNumParallelEval(1) + val cvSerialModel = cv.fit(dataset) + cv.setNumParallelEval(2) + val cvParallelModel = cv.fit(dataset) + + val serialMetrics = cvSerialModel.avgMetrics.sorted + val parallelMetrics = cvParallelModel.avgMetrics.sorted + assert(serialMetrics === parallelMetrics) + + val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression] + val parentParallel = cvParallelModel.bestModel.parent.asInstanceOf[LogisticRegression] + assert(parentSerial.getRegParam === parentParallel.getRegParam) + assert(parentSerial.getMaxIter === parentParallel.getMaxIter) + } + test("read/write: CrossValidator with simple estimator") { val lr = new LogisticRegression().setMaxIter(3) val evaluator = new BinaryClassificationEvaluator() diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index 4463a9b6e543..7dbdf9f08080 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -36,9 +36,14 @@ class TrainValidationSplitSuite import testImplicits._ - test("train validation with logistic regression") { - val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF() + @transient var dataset: Dataset[_] = _ + + override def beforeAll(): Unit = { + super.beforeAll() + dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF() + } + test("train validation with logistic regression") { val lr = new LogisticRegression val lrParamMaps = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.001, 1000.0)) @@ -118,6 +123,32 @@ class TrainValidationSplitSuite } } + test("train validation with parallel evaluation") { + val lr = new LogisticRegression + val lrParamMaps = new ParamGridBuilder() + .addGrid(lr.regParam, Array(0.001, 1000.0)) + .addGrid(lr.maxIter, Array(0, 3)) + .build() + val eval = new BinaryClassificationEvaluator + val cv = new TrainValidationSplit() + .setEstimator(lr) + .setEstimatorParamMaps(lrParamMaps) + .setEvaluator(eval) + .setNumParallelEval(1) + val cvSerialModel = cv.fit(dataset) + cv.setNumParallelEval(2) + val cvParallelModel = cv.fit(dataset) + + val serialMetrics = cvSerialModel.validationMetrics.sorted + val parallelMetrics = cvParallelModel.validationMetrics.sorted + assert(serialMetrics === parallelMetrics) + + val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression] + val parentParallel = cvParallelModel.bestModel.parent.asInstanceOf[LogisticRegression] + assert(parentSerial.getRegParam === parentParallel.getRegParam) + assert(parentSerial.getMaxIter === parentParallel.getMaxIter) + } + test("read/write: TrainValidationSplit") { val lr = new LogisticRegression().setMaxIter(3) val evaluator = new BinaryClassificationEvaluator() From 1274ba403a2340110e91c6617bd188a58df7d840 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 15 Feb 2017 15:10:51 -0800 Subject: [PATCH 05/17] added numParallelEval param usage to examples --- .../examples/ml/ModelSelectionViaCrossValidationExample.scala | 1 + .../ml/ModelSelectionViaTrainValidationSplitExample.scala | 2 ++ 2 files changed, 3 insertions(+) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala index c1ff9ef52170..c27748d53cfd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala @@ -93,6 +93,7 @@ object ModelSelectionViaCrossValidationExample { .setEvaluator(new BinaryClassificationEvaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) // Use 3+ in practice + .setNumParallelEval(2) // Evaluate up to 2 parameter settings in parallel // Run cross-validation, and choose the best set of parameters. val cvModel = cv.fit(training) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala index 1cd2641f9a8d..328693638be0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala @@ -65,6 +65,8 @@ object ModelSelectionViaTrainValidationSplitExample { .setEstimatorParamMaps(paramGrid) // 80% of the data will be used for training and the remaining 20% for validation. .setTrainRatio(0.8) + // Evaluate up to 2 parameter settings in parallel + .setNumParallelEval(2) // Run train validation split, and choose the best set of parameters. val model = trainValidationSplit.fit(training) From 80ac2fdbba98d4954b9b0c3e70712fba255486c2 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 16 Feb 2017 10:39:55 -0800 Subject: [PATCH 06/17] added documentation to ml-tuning --- docs/ml-tuning.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/ml-tuning.md b/docs/ml-tuning.md index a135adc4334c..d3e31f5858da 100644 --- a/docs/ml-tuning.md +++ b/docs/ml-tuning.md @@ -55,6 +55,9 @@ for multiclass problems. The default metric used to choose the best `ParamMap` c method in each of these evaluators. To help construct the parameter grid, users can use the [`ParamGridBuilder`](api/scala/index.html#org.apache.spark.ml.tuning.ParamGridBuilder) utility. +Sets of parameters from the parameter grid can be evaluated in parallel by setting `numParallelEval` with a value of 2 or more (a value of 1 will evaluate in serial) before running model selection with `CrossValidator` or `TrainValidationSplit`. +The value of `numParallelEval` should be chosen carefully to maximize concurrency without exceeding cluster resources. Generally speaking, a value up to 10 should be sufficient for most clusters. + # Cross-Validation From 81267106006514ef6e9967d86c8c9b1d8e918c7b Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 16 Feb 2017 14:00:37 -0800 Subject: [PATCH 07/17] changed sliding window limit to use a semaphore instead to prevent waiting for entire window to finish --- .../spark/ml/tuning/CrossValidator.scala | 37 ++++++++++--------- .../ml/tuning/TrainValidationSplit.scala | 36 +++++++++--------- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index f9bd5c6df488..e7b3bbb58905 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml.tuning import java.util.{List => JList} +import java.util.concurrent.Semaphore import scala.collection.JavaConverters._ @@ -104,14 +105,15 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val eval = $(evaluator) val epm = $(estimatorParamMaps) val numModels = epm.length - val numPar = $(numParallelEval) + val numParBarrier = new Semaphore($(numParallelEval)) val instr = Instrumentation.create(this, dataset) instr.logParams(numFolds, seed) logTuningParams(instr) // Compute metrics for each model over each fold - logDebug(s"Running cross-validation with level of parallelism: $numPar.") + logDebug("Running cross-validation with level of parallelism: " + + s"${numParBarrier.availablePermits()}.") val splits = MLUtils.kFold(dataset.toDF.rdd, $(numFolds), $(seed)) val metrics = splits.zipWithIndex.map { case ((training, validation), splitIndex) => val trainingDataset = sparkSession.createDataFrame(training, schema).cache() @@ -119,23 +121,24 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) // multi-model training logDebug(s"Train split $splitIndex with multiple sets of parameters.") - // Fit models concurrently, limited by a sliding window of size 'numPar' over estimator params - val models = epm.grouped(numPar).map { win => - win.par.map { paramMap => - est.fit(trainingDataset, paramMap) - } - }.toList.flatten.asInstanceOf[Seq[Model[_]]] + // Fit models concurrently, limited by a barrier with '$numParallelEval' permits + val models = epm.par.map { paramMap => + numParBarrier.acquire() + val model = est.fit(trainingDataset, paramMap) + numParBarrier.release() + model.asInstanceOf[Model[_]] + }.seq trainingDataset.unpersist() - // Evaluate models concurrently, limited by a sliding window of size 'numPar' over models - val foldMetrics = models.zip(epm).grouped(numPar).map { win => - win.par.map { case (model, paramMap) => - // TODO: duplicate evaluator to take extra params from input - val metric = eval.evaluate(model.transform(validationDataset, paramMap)) - logDebug(s"Got metric $metric for model trained with $paramMap.") - metric - } - }.toList.flatten + // Evaluate models concurrently, limited by a barrier with '$numParallelEval' permits + val foldMetrics = models.zip(epm).par.map { case (model, paramMap) => + numParBarrier.acquire() + // TODO: duplicate evaluator to take extra params from input + val metric = eval.evaluate(model.transform(validationDataset, paramMap)) + numParBarrier.release() + logDebug(s"Got metric $metric for model trained with $paramMap.") + metric + }.seq validationDataset.unpersist() foldMetrics diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index dd5529686ecf..5908d00abd21 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml.tuning import java.util.{List => JList} +import java.util.concurrent.Semaphore import scala.collection.JavaConverters._ import scala.language.existentials @@ -98,8 +99,8 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St val est = $(estimator) val eval = $(evaluator) val epm = $(estimatorParamMaps) - val numPar = $(numParallelEval) - logDebug(s"Running validation with level of parallelism: $numPar.") + val numParBarrier = new Semaphore($(numParallelEval)) + logDebug(s"Running validation with level of parallelism: ${numParBarrier.availablePermits()}.") val instr = Instrumentation.create(this, dataset) instr.logParams(trainRatio, seed) @@ -111,22 +112,23 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St validationDataset.cache() logDebug(s"Train split with multiple sets of parameters.") - // Fit models concurrently, limited by a sliding window of size 'numPar' over estimator params - val models = epm.grouped(numPar).map { win => - win.par.map { paramMap => - est.fit(trainingDataset, paramMap) - } - }.toList.flatten.asInstanceOf[Seq[Model[_]]] + // Fit models concurrently, limited by a barrier with '$numParallelEval' permits + val models = epm.par.map { paramMap => + numParBarrier.acquire() + val model = est.fit(trainingDataset, paramMap) + numParBarrier.release() + model.asInstanceOf[Model[_]] + }.seq trainingDataset.unpersist() - // Evaluate models concurrently, limited by a sliding window of size 'numPar' over models - val metrics = models.zip(epm).grouped(numPar).map { win => - win.par.map { case (model, paramMap) => - // TODO: duplicate evaluator to take extra params from input - val metric = eval.evaluate(model.transform(validationDataset, paramMap)) - logDebug(s"Got metric $metric for model trained with $paramMap.") - metric - } - }.toList.flatten.toArray + // Evaluate models concurrently, limited by a barrier with '$numParallelEval' permits + val metrics = models.zip(epm).par.map { case (model, paramMap) => + numParBarrier.acquire() + // TODO: duplicate evaluator to take extra params from input + val metric = eval.evaluate(model.transform(validationDataset, paramMap)) + numParBarrier.release() + logDebug(s"Got metric $metric for model trained with $paramMap.") + metric + }.seq.toArray validationDataset.unpersist() logInfo(s"Train validation split metrics: ${metrics.toSeq}") From 6a9b73585a0da0960f9a25be24e852e0b1c3e01d Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 16 Feb 2017 14:42:48 -0800 Subject: [PATCH 08/17] added note about parallelism capped by Scala collection thread pool, adjusted comments --- docs/ml-tuning.md | 2 +- .../scala/org/apache/spark/ml/tuning/CrossValidator.scala | 4 +++- .../org/apache/spark/ml/tuning/TrainValidationSplit.scala | 6 +++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/docs/ml-tuning.md b/docs/ml-tuning.md index d3e31f5858da..6f3b0f48292c 100644 --- a/docs/ml-tuning.md +++ b/docs/ml-tuning.md @@ -56,7 +56,7 @@ method in each of these evaluators. To help construct the parameter grid, users can use the [`ParamGridBuilder`](api/scala/index.html#org.apache.spark.ml.tuning.ParamGridBuilder) utility. Sets of parameters from the parameter grid can be evaluated in parallel by setting `numParallelEval` with a value of 2 or more (a value of 1 will evaluate in serial) before running model selection with `CrossValidator` or `TrainValidationSplit`. -The value of `numParallelEval` should be chosen carefully to maximize concurrency without exceeding cluster resources. Generally speaking, a value up to 10 should be sufficient for most clusters. +The value of `numParallelEval` should be chosen carefully to maximize concurrency without exceeding cluster resources, and will be capped at the number of cores in the driver system. Generally speaking, a value up to 10 should be sufficient for most clusters. # Cross-Validation diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index e7b3bbb58905..c936fc1d49d3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -105,6 +105,9 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val eval = $(evaluator) val epm = $(estimatorParamMaps) val numModels = epm.length + // Barrier to limit parallelism during model fit/evaluation + // NOTE: will be capped by size of thread pool used in Scala parallel collections, which is + // number of cores in the system by default val numParBarrier = new Semaphore($(numParallelEval)) val instr = Instrumentation.create(this, dataset) @@ -118,7 +121,6 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val metrics = splits.zipWithIndex.map { case ((training, validation), splitIndex) => val trainingDataset = sparkSession.createDataFrame(training, schema).cache() val validationDataset = sparkSession.createDataFrame(validation, schema).cache() - // multi-model training logDebug(s"Train split $splitIndex with multiple sets of parameters.") // Fit models concurrently, limited by a barrier with '$numParallelEval' permits diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index 5908d00abd21..d81d99df19eb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -99,6 +99,9 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St val est = $(estimator) val eval = $(evaluator) val epm = $(estimatorParamMaps) + // Barrier to limit parallelism during model fit/evaluation + // NOTE: will be capped by size of thread pool used in Scala parallel collections, which is + // number of cores in the system by default val numParBarrier = new Semaphore($(numParallelEval)) logDebug(s"Running validation with level of parallelism: ${numParBarrier.availablePermits()}.") @@ -111,8 +114,8 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St trainingDataset.cache() validationDataset.cache() - logDebug(s"Train split with multiple sets of parameters.") // Fit models concurrently, limited by a barrier with '$numParallelEval' permits + logDebug(s"Train split with multiple sets of parameters.") val models = epm.par.map { paramMap => numParBarrier.acquire() val model = est.fit(trainingDataset, paramMap) @@ -120,6 +123,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St model.asInstanceOf[Model[_]] }.seq trainingDataset.unpersist() + // Evaluate models concurrently, limited by a barrier with '$numParallelEval' permits val metrics = models.zip(epm).par.map { case (model, paramMap) => numParBarrier.acquire() From 1c2e391e74696c60690f477fe4e0b636f0d98318 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 27 Feb 2017 17:29:33 -0800 Subject: [PATCH 09/17] reworked to use ExecutorService and Futures --- .../spark/ml/tuning/CrossValidator.scala | 75 +++++++++++-------- .../ml/tuning/TrainValidationSplit.scala | 59 +++++++++------ .../spark/ml/tuning/ValidatorParams.scala | 24 ++++++ 3 files changed, 105 insertions(+), 53 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index c936fc1d49d3..8894d81cfe6c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -18,9 +18,10 @@ package org.apache.spark.ml.tuning import java.util.{List => JList} -import java.util.concurrent.Semaphore import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration import com.github.fommil.netlib.F2jBLAS import org.apache.hadoop.fs.Path @@ -28,13 +29,15 @@ import org.json4s.DefaultFormats import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml._ +import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.evaluation.Evaluator -import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators} import org.apache.spark.ml.util._ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.types.StructType +import org.apache.spark.util.ThreadUtils + /** * Params for [[CrossValidator]] and [[CrossValidatorModel]]. @@ -105,48 +108,58 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val eval = $(evaluator) val epm = $(estimatorParamMaps) val numModels = epm.length - // Barrier to limit parallelism during model fit/evaluation - // NOTE: will be capped by size of thread pool used in Scala parallel collections, which is - // number of cores in the system by default - val numParBarrier = new Semaphore($(numParallelEval)) + + // Create execution context, run in serial if numParallelEval is 1 + val executionContext = $(numParallelEval) match { + case 1 => + ThreadUtils.sameThread + case n => + ExecutionContext.fromExecutorService(executorServiceFactory(n)) + } val instr = Instrumentation.create(this, dataset) instr.logParams(numFolds, seed) logTuningParams(instr) - // Compute metrics for each model over each fold - logDebug("Running cross-validation with level of parallelism: " + - s"${numParBarrier.availablePermits()}.") + // Compute metrics for each model over each split + logDebug(s"Running cross-validation with level of parallelism: $numParallelEval.") val splits = MLUtils.kFold(dataset.toDF.rdd, $(numFolds), $(seed)) val metrics = splits.zipWithIndex.map { case ((training, validation), splitIndex) => val trainingDataset = sparkSession.createDataFrame(training, schema).cache() val validationDataset = sparkSession.createDataFrame(validation, schema).cache() logDebug(s"Train split $splitIndex with multiple sets of parameters.") - // Fit models concurrently, limited by a barrier with '$numParallelEval' permits - val models = epm.par.map { paramMap => - numParBarrier.acquire() - val model = est.fit(trainingDataset, paramMap) - numParBarrier.release() - model.asInstanceOf[Model[_]] - }.seq - trainingDataset.unpersist() - - // Evaluate models concurrently, limited by a barrier with '$numParallelEval' permits - val foldMetrics = models.zip(epm).par.map { case (model, paramMap) => - numParBarrier.acquire() - // TODO: duplicate evaluator to take extra params from input - val metric = eval.evaluate(model.transform(validationDataset, paramMap)) - numParBarrier.release() - logDebug(s"Got metric $metric for model trained with $paramMap.") - metric - }.seq - + // Fit models in a Future with thread-pool size determined by '$numParallelEval' + val models = epm.map { paramMap => + Future[Model[_]] { + val model = est.fit(trainingDataset, paramMap) + model.asInstanceOf[Model[_]] + } (executionContext) + } + + Future.sequence[Model[_], Iterable](models)(implicitly, executionContext).onComplete { _ => + trainingDataset.unpersist() + } (executionContext) + + // Evaluate models in a Future with thread-pool size determined by '$numParallelEval' + val foldMetricFutures = models.zip(epm).map { case (modelFuture, paramMap) => + modelFuture.flatMap { model => + Future { + // TODO: duplicate evaluator to take extra params from input + val metric = eval.evaluate(model.transform(validationDataset, paramMap)) + logDebug(s"Got metric $metric for model trained with $paramMap.") + metric + } (executionContext) + } (executionContext) + } + + // Wait for metrics to be calculated before upersisting validation dataset + val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) validationDataset.unpersist() foldMetrics - }.reduce((mA, mB) => mA.zip(mB).map(m => m._1 + m._2)).toArray + }.transpose.map(_.sum) - // Calculate average metric for all folds + // Calculate average metric over all splits f2jBLAS.dscal(numModels, 1.0 / $(numFolds), metrics, 1) logInfo(s"Average cross-validation metrics: ${metrics.toSeq}") diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index d81d99df19eb..667909b96faa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -18,9 +18,10 @@ package org.apache.spark.ml.tuning import java.util.{List => JList} -import java.util.concurrent.Semaphore import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration import scala.language.existentials import org.apache.hadoop.fs.Path @@ -34,6 +35,7 @@ import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.types.StructType +import org.apache.spark.util.ThreadUtils /** * Params for [[TrainValidationSplit]] and [[TrainValidationSplitModel]]. @@ -99,40 +101,53 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St val est = $(estimator) val eval = $(evaluator) val epm = $(estimatorParamMaps) - // Barrier to limit parallelism during model fit/evaluation - // NOTE: will be capped by size of thread pool used in Scala parallel collections, which is - // number of cores in the system by default - val numParBarrier = new Semaphore($(numParallelEval)) - logDebug(s"Running validation with level of parallelism: ${numParBarrier.availablePermits()}.") + + // Create execution context, run in serial if numParallelEval is 1 + val executionContext = $(numParallelEval) match { + case 1 => + ThreadUtils.sameThread + case n => + ExecutionContext.fromExecutorService(executorServiceFactory(n)) + } val instr = Instrumentation.create(this, dataset) instr.logParams(trainRatio, seed) logTuningParams(instr) + logDebug(s"Running validation with level of parallelism: $numParallelEval.") val Array(trainingDataset, validationDataset) = dataset.randomSplit(Array($(trainRatio), 1 - $(trainRatio)), $(seed)) trainingDataset.cache() validationDataset.cache() - // Fit models concurrently, limited by a barrier with '$numParallelEval' permits + // Fit models in a Future with thread-pool size determined by '$numParallelEval' logDebug(s"Train split with multiple sets of parameters.") - val models = epm.par.map { paramMap => - numParBarrier.acquire() - val model = est.fit(trainingDataset, paramMap) - numParBarrier.release() - model.asInstanceOf[Model[_]] - }.seq - trainingDataset.unpersist() + val models = epm.map { paramMap => + Future[Model[_]] { + val model = est.fit(trainingDataset, paramMap) + model.asInstanceOf[Model[_]] + } (executionContext) + } + + Future.sequence[Model[_], Iterable](models)(implicitly, executionContext).onComplete { _ => + trainingDataset.unpersist() + } (executionContext) // Evaluate models concurrently, limited by a barrier with '$numParallelEval' permits - val metrics = models.zip(epm).par.map { case (model, paramMap) => - numParBarrier.acquire() - // TODO: duplicate evaluator to take extra params from input - val metric = eval.evaluate(model.transform(validationDataset, paramMap)) - numParBarrier.release() - logDebug(s"Got metric $metric for model trained with $paramMap.") - metric - }.seq.toArray + val metricFutures = models.zip(epm).map { case (modelFuture, paramMap) => + modelFuture.flatMap { model => + Future { + // TODO: duplicate evaluator to take extra params from input + val metric = eval.evaluate(model.transform(validationDataset, paramMap)) + logDebug(s"Got metric $metric for model trained with $paramMap.") + metric + } (executionContext) + } (executionContext) + } + + // Wait for all metrics to be calculated + val metrics = metricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) + validationDataset.unpersist() logInfo(s"Train validation split metrics: ${metrics.toSeq}") diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index 404fadc1532e..debefc5ceda7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -17,10 +17,13 @@ package org.apache.spark.ml.tuning +import java.util.concurrent.ExecutorService + import org.apache.hadoop.fs.Path import org.json4s.{DefaultFormats, _} import org.json4s.jackson.JsonMethods._ +import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.SparkContext import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.evaluation.Evaluator @@ -29,6 +32,7 @@ import org.apache.spark.ml.param.shared.HasSeed import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.sql.types.StructType +import org.apache.spark.util.ThreadUtils /** * Common params for [[TrainValidationSplitParams]] and [[CrossValidatorParams]]. @@ -80,6 +84,26 @@ private[ml] trait ValidatorParams extends HasSeed with Params { /** @group getParam */ def getNumParallelEval: Int = $(numParallelEval) + /** + * Creates a execution service to be used for validation, defaults to a thread-pool with + * size of `numParallelEval` + */ + protected var executorServiceFactory: (Int) => ExecutorService = { + (requestedMaxThreads: Int) => ThreadUtils.newDaemonCachedThreadPool( + s"${this.getClass.getSimpleName}-thread-pool", requestedMaxThreads) + } + + /** + * Sets a function to get an execution service to be used for validation + * + * @param getExecutorService function to get an ExecutorService given a requestedMaxThread size + */ + @Experimental + @InterfaceStability.Unstable + def setExecutorService(getExecutorService: (Int) => ExecutorService): Unit = { + executorServiceFactory = getExecutorService + } + protected def transformSchemaImpl(schema: StructType): StructType = { require($(estimatorParamMaps).nonEmpty, s"Validator requires non-empty estimatorParamMaps") val firstEstimatorParamMap = $(estimatorParamMaps).head From 9e055cd9e3c6fc507deb2db4981efe91b8be994a Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 27 Feb 2017 17:38:42 -0800 Subject: [PATCH 10/17] fixed wildcard import --- docs/ml-tuning.md | 2 +- .../main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/ml-tuning.md b/docs/ml-tuning.md index 6f3b0f48292c..8c4a01c01179 100644 --- a/docs/ml-tuning.md +++ b/docs/ml-tuning.md @@ -56,7 +56,7 @@ method in each of these evaluators. To help construct the parameter grid, users can use the [`ParamGridBuilder`](api/scala/index.html#org.apache.spark.ml.tuning.ParamGridBuilder) utility. Sets of parameters from the parameter grid can be evaluated in parallel by setting `numParallelEval` with a value of 2 or more (a value of 1 will evaluate in serial) before running model selection with `CrossValidator` or `TrainValidationSplit`. -The value of `numParallelEval` should be chosen carefully to maximize concurrency without exceeding cluster resources, and will be capped at the number of cores in the driver system. Generally speaking, a value up to 10 should be sufficient for most clusters. +The value of `numParallelEval` should be chosen carefully to maximize parallelism without exceeding cluster resources, and will be capped at the number of cores in the driver system. Generally speaking, a value up to 10 should be sufficient for most clusters. # Cross-Validation diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index debefc5ceda7..a949f697cdf1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -27,7 +27,7 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.SparkContext import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.evaluation.Evaluator -import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamPair, Params, ParamValidators} import org.apache.spark.ml.param.shared.HasSeed import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata From 97ad7b4db9c31a3f433a78799edcc3978a1e701b Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 11 Apr 2017 14:07:06 -0700 Subject: [PATCH 11/17] made doc changes --- docs/ml-tuning.md | 3 +-- .../main/scala/org/apache/spark/ml/tuning/CrossValidator.scala | 2 +- .../org/apache/spark/ml/tuning/TrainValidationSplit.scala | 2 +- .../scala/org/apache/spark/ml/tuning/ValidatorParams.scala | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/ml-tuning.md b/docs/ml-tuning.md index 8c4a01c01179..e7fc7c7e62da 100644 --- a/docs/ml-tuning.md +++ b/docs/ml-tuning.md @@ -55,10 +55,9 @@ for multiclass problems. The default metric used to choose the best `ParamMap` c method in each of these evaluators. To help construct the parameter grid, users can use the [`ParamGridBuilder`](api/scala/index.html#org.apache.spark.ml.tuning.ParamGridBuilder) utility. -Sets of parameters from the parameter grid can be evaluated in parallel by setting `numParallelEval` with a value of 2 or more (a value of 1 will evaluate in serial) before running model selection with `CrossValidator` or `TrainValidationSplit`. +By default, sets of parameters from the parameter grid are evaluated in serial. Parameter evaluation can be done in parallel by setting `numParallelEval` with a value of 2 or more (a value of 1 will be serial) before running model selection with `CrossValidator` or `TrainValidationSplit` (NOTE: this is not yet supported in Python). The value of `numParallelEval` should be chosen carefully to maximize parallelism without exceeding cluster resources, and will be capped at the number of cores in the driver system. Generally speaking, a value up to 10 should be sufficient for most clusters. - # Cross-Validation `CrossValidator` begins by splitting the dataset into a set of *folds* which are used as separate training and test datasets. E.g., with `$k=3$` folds, `CrossValidator` will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular `ParamMap`, `CrossValidator` computes the average evaluation metric for the 3 `Model`s produced by fitting the `Estimator` on the 3 different (training, test) dataset pairs. diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 8894d81cfe6c..752734f7e540 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -153,7 +153,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) } (executionContext) } - // Wait for metrics to be calculated before upersisting validation dataset + // Wait for metrics to be calculated before unpersisting validation dataset val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) validationDataset.unpersist() foldMetrics diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index 667909b96faa..eab1f0ad5634 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -133,7 +133,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St trainingDataset.unpersist() } (executionContext) - // Evaluate models concurrently, limited by a barrier with '$numParallelEval' permits + // Evaluate models in a Future with thread-pool size determined by '$numParallelEval' val metricFutures = models.zip(epm).map { case (modelFuture, paramMap) => modelFuture.flatMap { model => Future { diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index a949f697cdf1..ccf5e63a3f09 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -75,7 +75,7 @@ private[ml] trait ValidatorParams extends HasSeed with Params { * param to control the number of models evaluated in parallel * Default: 1 * - * @group param + * @group expertParam */ val numParallelEval: IntParam = new IntParam(this, "numParallelEval", "max number of models to evaluate in parallel, 1 for serial evaluation", From 5e8a0869dcefaa5febf6cc354a7840225268acf9 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 11 Apr 2017 17:08:18 -0700 Subject: [PATCH 12/17] changed ExecutorService factory to a trait to be compatible with Java --- .../spark/ml/tuning/CrossValidator.scala | 10 ++--- .../ml/tuning/TrainValidationSplit.scala | 10 ++--- .../spark/ml/tuning/ValidatorParams.scala | 43 +++++++++++++++---- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 752734f7e540..1dd5b09dc5bb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -109,13 +109,9 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val epm = $(estimatorParamMaps) val numModels = epm.length - // Create execution context, run in serial if numParallelEval is 1 - val executionContext = $(numParallelEval) match { - case 1 => - ThreadUtils.sameThread - case n => - ExecutionContext.fromExecutorService(executorServiceFactory(n)) - } + // Create execution context from the executor service factory + val executionContext = ExecutionContext.fromExecutorService( + executorServiceFactory.create($(numParallelEval))) val instr = Instrumentation.create(this, dataset) instr.logParams(numFolds, seed) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index eab1f0ad5634..0b788c5a1bbd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -102,13 +102,9 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St val eval = $(evaluator) val epm = $(estimatorParamMaps) - // Create execution context, run in serial if numParallelEval is 1 - val executionContext = $(numParallelEval) match { - case 1 => - ThreadUtils.sameThread - case n => - ExecutionContext.fromExecutorService(executorServiceFactory(n)) - } + // Create execution context from the executor service factory + val executionContext = ExecutionContext.fromExecutorService( + executorServiceFactory.create($(numParallelEval))) val instr = Instrumentation.create(this, dataset) instr.logParams(trainRatio, seed) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index ccf5e63a3f09..edcdee40020f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -19,6 +19,7 @@ package org.apache.spark.ml.tuning import java.util.concurrent.ExecutorService +import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.Path import org.json4s.{DefaultFormats, _} import org.json4s.jackson.JsonMethods._ @@ -34,6 +35,23 @@ import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.sql.types.StructType import org.apache.spark.util.ThreadUtils + +/** + * Interface for defining a custom ExecutorService factory + */ +@Experimental +@InterfaceStability.Unstable +trait ExecutorServiceFactory { + + /** + * Creates an ExecutorService + * + * @param requestedMaxThreads requested max thread-pool size, optional for the ExecutorService + * @return an instance of the ExecutorService + */ + def create(requestedMaxThreads: Int): ExecutorService +} + /** * Common params for [[TrainValidationSplitParams]] and [[CrossValidatorParams]]. */ @@ -86,22 +104,31 @@ private[ml] trait ValidatorParams extends HasSeed with Params { /** * Creates a execution service to be used for validation, defaults to a thread-pool with - * size of `numParallelEval` + * size of `numParallelEval`, will run same-thread executor if requested pool size is 1 */ - protected var executorServiceFactory: (Int) => ExecutorService = { - (requestedMaxThreads: Int) => ThreadUtils.newDaemonCachedThreadPool( - s"${this.getClass.getSimpleName}-thread-pool", requestedMaxThreads) + protected var executorServiceFactory: ExecutorServiceFactory = { + new ExecutorServiceFactory { + override def create(requestedMaxThreads: Int): ExecutorService = { + requestedMaxThreads match { + case 1 => + MoreExecutors.sameThreadExecutor() + case n => + ThreadUtils.newDaemonCachedThreadPool( + s"${this.getClass.getSimpleName}-thread-pool", requestedMaxThreads) + } + } + } } /** - * Sets a function to get an execution service to be used for validation + * Sets a factory to create an ExecutorService to be used for evaluation thread-pool * - * @param getExecutorService function to get an ExecutorService given a requestedMaxThread size + * @param factory instance of an ExecutorServiceFactory for using custom ExecutorService */ @Experimental @InterfaceStability.Unstable - def setExecutorService(getExecutorService: (Int) => ExecutorService): Unit = { - executorServiceFactory = getExecutorService + def setExecutorService(factory: ExecutorServiceFactory): Unit = { + executorServiceFactory = factory } protected def transformSchemaImpl(schema: StructType): StructType = { From ad8a8700306869722cb19b6b16c030b343a209c0 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 13 Jun 2017 17:57:10 -0700 Subject: [PATCH 13/17] Changed ExecutorService to be set explicitly instead of factory --- ...lectionViaTrainValidationSplitExample.java | 3 +- .../spark/ml/tuning/CrossValidator.scala | 24 ++++---- .../ml/tuning/TrainValidationSplit.scala | 24 ++++---- .../spark/ml/tuning/ValidatorParams.scala | 57 +++++++------------ 4 files changed, 46 insertions(+), 62 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java index 9a4722b90cf1..0b63326c94b4 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java @@ -70,7 +70,8 @@ public static void main(String[] args) { .setEstimator(lr) .setEvaluator(new RegressionEvaluator()) .setEstimatorParamMaps(paramGrid) - .setTrainRatio(0.8); // 80% for training and the remaining 20% for validation + .setTrainRatio(0.8) // 80% for training and the remaining 20% for validation + .setNumParallelEval(2); // Evaluate up to 2 parameter settings in parallel // Run train validation split, and choose the best set of parameters. TrainValidationSplitModel model = trainValidationSplit.fit(training); diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 1dd5b09dc5bb..331dd1aa33d2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -109,23 +109,23 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val epm = $(estimatorParamMaps) val numModels = epm.length - // Create execution context from the executor service factory - val executionContext = ExecutionContext.fromExecutorService( - executorServiceFactory.create($(numParallelEval))) + // Create execution context (defaults to thread-pool of size $numParallelEval) + val (executor, executorDesc) = getExecutorService + val executionContext = ExecutionContext.fromExecutorService(executor) val instr = Instrumentation.create(this, dataset) instr.logParams(numFolds, seed) logTuningParams(instr) // Compute metrics for each model over each split - logDebug(s"Running cross-validation with level of parallelism: $numParallelEval.") + logDebug(s"Running cross-validation with ExecutorService: $executorDesc.") val splits = MLUtils.kFold(dataset.toDF.rdd, $(numFolds), $(seed)) val metrics = splits.zipWithIndex.map { case ((training, validation), splitIndex) => val trainingDataset = sparkSession.createDataFrame(training, schema).cache() val validationDataset = sparkSession.createDataFrame(validation, schema).cache() logDebug(s"Train split $splitIndex with multiple sets of parameters.") - // Fit models in a Future with thread-pool size determined by '$numParallelEval' + // Fit models in a Future with thread-pool size of '$numParallelEval' or custom executor val models = epm.map { paramMap => Future[Model[_]] { val model = est.fit(trainingDataset, paramMap) @@ -137,15 +137,13 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) trainingDataset.unpersist() } (executionContext) - // Evaluate models in a Future with thread-pool size determined by '$numParallelEval' + // Evaluate models in a Future with thread-pool size of '$numParallelEval' or custom executor val foldMetricFutures = models.zip(epm).map { case (modelFuture, paramMap) => - modelFuture.flatMap { model => - Future { - // TODO: duplicate evaluator to take extra params from input - val metric = eval.evaluate(model.transform(validationDataset, paramMap)) - logDebug(s"Got metric $metric for model trained with $paramMap.") - metric - } (executionContext) + modelFuture.map { model => + // TODO: duplicate evaluator to take extra params from input + val metric = eval.evaluate(model.transform(validationDataset, paramMap)) + logDebug(s"Got metric $metric for model trained with $paramMap.") + metric } (executionContext) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index 0b788c5a1bbd..c42c197b4b06 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -102,21 +102,21 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St val eval = $(evaluator) val epm = $(estimatorParamMaps) - // Create execution context from the executor service factory - val executionContext = ExecutionContext.fromExecutorService( - executorServiceFactory.create($(numParallelEval))) + // Create execution context (defaults to thread-pool of size $numParallelEval) + val (executor, executorDesc) = getExecutorService + val executionContext = ExecutionContext.fromExecutorService(executor) val instr = Instrumentation.create(this, dataset) instr.logParams(trainRatio, seed) logTuningParams(instr) - logDebug(s"Running validation with level of parallelism: $numParallelEval.") + logDebug(s"Running validation with ExecutorService: $executorDesc.") val Array(trainingDataset, validationDataset) = dataset.randomSplit(Array($(trainRatio), 1 - $(trainRatio)), $(seed)) trainingDataset.cache() validationDataset.cache() - // Fit models in a Future with thread-pool size determined by '$numParallelEval' + // Fit models in a Future with thread-pool size of '$numParallelEval' or custom executor logDebug(s"Train split with multiple sets of parameters.") val models = epm.map { paramMap => Future[Model[_]] { @@ -129,15 +129,13 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St trainingDataset.unpersist() } (executionContext) - // Evaluate models in a Future with thread-pool size determined by '$numParallelEval' + // Evaluate models in a Future with thread-pool size of '$numParallelEval' or custom executor val metricFutures = models.zip(epm).map { case (modelFuture, paramMap) => - modelFuture.flatMap { model => - Future { - // TODO: duplicate evaluator to take extra params from input - val metric = eval.evaluate(model.transform(validationDataset, paramMap)) - logDebug(s"Got metric $metric for model trained with $paramMap.") - metric - } (executionContext) + modelFuture.map { model => + // TODO: duplicate evaluator to take extra params from input + val metric = eval.evaluate(model.transform(validationDataset, paramMap)) + logDebug(s"Got metric $metric for model trained with $paramMap.") + metric } (executionContext) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index edcdee40020f..55dac94446e4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -36,22 +36,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.ThreadUtils -/** - * Interface for defining a custom ExecutorService factory - */ -@Experimental -@InterfaceStability.Unstable -trait ExecutorServiceFactory { - - /** - * Creates an ExecutorService - * - * @param requestedMaxThreads requested max thread-pool size, optional for the ExecutorService - * @return an instance of the ExecutorService - */ - def create(requestedMaxThreads: Int): ExecutorService -} - /** * Common params for [[TrainValidationSplitParams]] and [[CrossValidatorParams]]. */ @@ -103,34 +87,37 @@ private[ml] trait ValidatorParams extends HasSeed with Params { def getNumParallelEval: Int = $(numParallelEval) /** - * Creates a execution service to be used for validation, defaults to a thread-pool with - * size of `numParallelEval`, will run same-thread executor if requested pool size is 1 + * Sets a factory to create an ExecutorService to be used for evaluation thread-pool + * + * @param executor instance of an ExecutorServiceFactory for using custom ExecutorService */ - protected var executorServiceFactory: ExecutorServiceFactory = { - new ExecutorServiceFactory { - override def create(requestedMaxThreads: Int): ExecutorService = { - requestedMaxThreads match { - case 1 => - MoreExecutors.sameThreadExecutor() - case n => - ThreadUtils.newDaemonCachedThreadPool( - s"${this.getClass.getSimpleName}-thread-pool", requestedMaxThreads) - } - } - } + @Experimental + @InterfaceStability.Unstable + def setExecutorService(executor: ExecutorService): Unit = { + customExecutor = executor } /** - * Sets a factory to create an ExecutorService to be used for evaluation thread-pool - * - * @param factory instance of an ExecutorServiceFactory for using custom ExecutorService + * Get an ExecutorService that will be a custom one, if previously set, or a new thread-pool + * with maximum threads set to the param `numParallelEval`. If this param is set to 1, a + * same-thread executor will be used to run in serial. */ @Experimental @InterfaceStability.Unstable - def setExecutorService(factory: ExecutorServiceFactory): Unit = { - executorServiceFactory = factory + def getExecutorService: (ExecutorService, String) = { + (Option(customExecutor), $(numParallelEval)) match { + case (Some(executor), _) => + (executor, s"Custom executor [$executor]") + case (None, 1) => + (MoreExecutors.sameThreadExecutor(), "Same-thread executor") + case (None, n) => + (ThreadUtils.newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", n), + s"Thread-pool with $n threads") + } } + protected var customExecutor: ExecutorService = _ + protected def transformSchemaImpl(schema: StructType): StructType = { require($(estimatorParamMaps).nonEmpty, s"Validator requires non-empty estimatorParamMaps") val firstEstimatorParamMap = $(estimatorParamMaps).head From 911af1db14b544df8a07a9e72fa7ac46f86f5a2e Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 23 Aug 2017 11:32:59 -0700 Subject: [PATCH 14/17] added HasParallelism trait --- .../ml/param/shared/HasParallelism.scala | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala new file mode 100644 index 000000000000..021d0b3e3416 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.param.shared + +import scala.concurrent.ExecutionContext + +import org.apache.spark.ml.param.{IntParam, Params, ParamValidators} +import org.apache.spark.util.ThreadUtils + +/** + * Trait to define a level of parallelism for algorithms that are able to use + * multithreaded execution, and provide a thread-pool based execution context. + */ +private[ml] trait HasParallelism extends Params { + + /** + * The number of threads to use when running parallel algorithms. + * Default is 1 for serial execution + * + * @group expertParam + */ + val parallelism = new IntParam(this, "parallelism", + "the number of threads to use when running parallel algorithms", ParamValidators.gtEq(1)) + + setDefault(parallelism -> 1) + + /** @group expertGetParam */ + def getParallelism: Int = $(parallelism) + + /** + * Create a new execution context with a thread-pool that has a maximum number of threads + * set to the value of [[parallelism]]. If this param is set to 1, a same-thread executor + * will be used to run in serial. + */ + private[ml] def getExecutionContext: ExecutionContext = { + getParallelism match { + case 1 => + ThreadUtils.sameThread + case n => + ExecutionContext.fromExecutorService(ThreadUtils + .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", n)) + } + } +} From 658aacb8f4e8e7937f2bdd81074a54a60dee2037 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 23 Aug 2017 12:02:15 -0700 Subject: [PATCH 15/17] Updated to use Trait HasParallelsim --- ...delSelectionViaCrossValidationExample.java | 4 +- ...lectionViaTrainValidationSplitExample.java | 4 +- ...elSelectionViaCrossValidationExample.scala | 2 +- ...ectionViaTrainValidationSplitExample.scala | 2 +- .../spark/ml/tuning/CrossValidator.scala | 24 ++++----- .../ml/tuning/TrainValidationSplit.scala | 25 ++++----- .../spark/ml/tuning/ValidatorParams.scala | 54 +------------------ .../spark/ml/tuning/CrossValidatorSuite.scala | 4 +- .../ml/tuning/TrainValidationSplitSuite.scala | 4 +- 9 files changed, 37 insertions(+), 86 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaCrossValidationExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaCrossValidationExample.java index 975c65edc0ca..d97327969ab2 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaCrossValidationExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaCrossValidationExample.java @@ -94,7 +94,9 @@ public static void main(String[] args) { CrossValidator cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator()) - .setEstimatorParamMaps(paramGrid).setNumFolds(2); // Use 3+ in practice + .setEstimatorParamMaps(paramGrid) + .setNumFolds(2) // Use 3+ in practice + .setParallelism(2); // Evaluate up to 2 parameter settings in parallel // Run cross-validation, and choose the best set of parameters. CrossValidatorModel cvModel = cv.fit(training); diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java index 0b63326c94b4..2ef8bea0b2a2 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaModelSelectionViaTrainValidationSplitExample.java @@ -70,8 +70,8 @@ public static void main(String[] args) { .setEstimator(lr) .setEvaluator(new RegressionEvaluator()) .setEstimatorParamMaps(paramGrid) - .setTrainRatio(0.8) // 80% for training and the remaining 20% for validation - .setNumParallelEval(2); // Evaluate up to 2 parameter settings in parallel + .setTrainRatio(0.8) // 80% for training and the remaining 20% for validation + .setParallelism(2); // Evaluate up to 2 parameter settings in parallel // Run train validation split, and choose the best set of parameters. TrainValidationSplitModel model = trainValidationSplit.fit(training); diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala index c27748d53cfd..87d96dd51eb9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala @@ -93,7 +93,7 @@ object ModelSelectionViaCrossValidationExample { .setEvaluator(new BinaryClassificationEvaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) // Use 3+ in practice - .setNumParallelEval(2) // Evaluate up to 2 parameter settings in parallel + .setParallelism(2) // Evaluate up to 2 parameter settings in parallel // Run cross-validation, and choose the best set of parameters. val cvModel = cv.fit(training) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala index 328693638be0..71e41e7298c7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala @@ -66,7 +66,7 @@ object ModelSelectionViaTrainValidationSplitExample { // 80% of the data will be used for training and the remaining 20% for validation. .setTrainRatio(0.8) // Evaluate up to 2 parameter settings in parallel - .setNumParallelEval(2) + .setParallelism(2) // Run train validation split, and choose the best set of parameters. val model = trainValidationSplit.fit(training) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 331dd1aa33d2..05d81307161b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml.tuning import java.util.{List => JList} import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future import scala.concurrent.duration.Duration import com.github.fommil.netlib.F2jBLAS @@ -32,6 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.HasParallelism import org.apache.spark.ml.util._ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.sql.{DataFrame, Dataset} @@ -68,7 +69,7 @@ private[ml] trait CrossValidatorParams extends ValidatorParams { @Since("1.2.0") class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) extends Estimator[CrossValidatorModel] - with CrossValidatorParams with MLWritable with Logging { + with CrossValidatorParams with HasParallelism with MLWritable with Logging { @Since("1.2.0") def this() = this(Identifiable.randomUID("cv")) @@ -95,9 +96,9 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) @Since("2.0.0") def setSeed(value: Long): this.type = set(seed, value) - /** @group setParam */ - @Since("2.2.0") - def setNumParallelEval(value: Int): this.type = set(numParallelEval, value) + /** @group expertSetParam */ + @Since("2.3.0") + def setParallelism(value: Int): this.type = set(parallelism, value) @Since("2.0.0") override def fit(dataset: Dataset[_]): CrossValidatorModel = { @@ -109,23 +110,21 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val epm = $(estimatorParamMaps) val numModels = epm.length - // Create execution context (defaults to thread-pool of size $numParallelEval) - val (executor, executorDesc) = getExecutorService - val executionContext = ExecutionContext.fromExecutorService(executor) + // Create execution context based on $(parallelism) + val executionContext = getExecutionContext val instr = Instrumentation.create(this, dataset) - instr.logParams(numFolds, seed) + instr.logParams(numFolds, seed, parallelism) logTuningParams(instr) // Compute metrics for each model over each split - logDebug(s"Running cross-validation with ExecutorService: $executorDesc.") val splits = MLUtils.kFold(dataset.toDF.rdd, $(numFolds), $(seed)) val metrics = splits.zipWithIndex.map { case ((training, validation), splitIndex) => val trainingDataset = sparkSession.createDataFrame(training, schema).cache() val validationDataset = sparkSession.createDataFrame(validation, schema).cache() logDebug(s"Train split $splitIndex with multiple sets of parameters.") - // Fit models in a Future with thread-pool size of '$numParallelEval' or custom executor + // Fit models in a Future for training in parallel val models = epm.map { paramMap => Future[Model[_]] { val model = est.fit(trainingDataset, paramMap) @@ -133,11 +132,12 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) } (executionContext) } + // Unpersist training data only when all models have trained Future.sequence[Model[_], Iterable](models)(implicitly, executionContext).onComplete { _ => trainingDataset.unpersist() } (executionContext) - // Evaluate models in a Future with thread-pool size of '$numParallelEval' or custom executor + // Evaluate models in a Future that will calulate a metric and allow model to be cleaned up val foldMetricFutures = models.zip(epm).map { case (modelFuture, paramMap) => modelFuture.map { model => // TODO: duplicate evaluator to take extra params from input diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index c42c197b4b06..c33aa0ad00cb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -20,7 +20,7 @@ package org.apache.spark.ml.tuning import java.util.{List => JList} import scala.collection.JavaConverters._ -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.language.existentials @@ -32,6 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.HasParallelism import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.types.StructType @@ -65,7 +66,7 @@ private[ml] trait TrainValidationSplitParams extends ValidatorParams { @Since("1.5.0") class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: String) extends Estimator[TrainValidationSplitModel] - with TrainValidationSplitParams with MLWritable with Logging { + with TrainValidationSplitParams with HasParallelism with MLWritable with Logging { @Since("1.5.0") def this() = this(Identifiable.randomUID("tvs")) @@ -90,9 +91,9 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St @Since("2.0.0") def setSeed(value: Long): this.type = set(seed, value) - /** @group setParam */ - @Since("2.2.0") - def setNumParallelEval(value: Int): this.type = set(numParallelEval, value) + /** @group expertSetParam */ + @Since("2.3.0") + def setParallelism(value: Int): this.type = set(parallelism, value) @Since("2.0.0") override def fit(dataset: Dataset[_]): TrainValidationSplitModel = { @@ -102,21 +103,19 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St val eval = $(evaluator) val epm = $(estimatorParamMaps) - // Create execution context (defaults to thread-pool of size $numParallelEval) - val (executor, executorDesc) = getExecutorService - val executionContext = ExecutionContext.fromExecutorService(executor) + // Create execution context based on $(parallelism) + val executionContext = getExecutionContext val instr = Instrumentation.create(this, dataset) - instr.logParams(trainRatio, seed) + instr.logParams(trainRatio, seed, parallelism) logTuningParams(instr) - logDebug(s"Running validation with ExecutorService: $executorDesc.") val Array(trainingDataset, validationDataset) = dataset.randomSplit(Array($(trainRatio), 1 - $(trainRatio)), $(seed)) trainingDataset.cache() validationDataset.cache() - // Fit models in a Future with thread-pool size of '$numParallelEval' or custom executor + // Fit models in a Future for training in parallel logDebug(s"Train split with multiple sets of parameters.") val models = epm.map { paramMap => Future[Model[_]] { @@ -125,11 +124,12 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St } (executionContext) } + // Unpersist training data only when all models have trained Future.sequence[Model[_], Iterable](models)(implicitly, executionContext).onComplete { _ => trainingDataset.unpersist() } (executionContext) - // Evaluate models in a Future with thread-pool size of '$numParallelEval' or custom executor + // Evaluate models in a Future that will calulate a metric and allow model to be cleaned up val metricFutures = models.zip(epm).map { case (modelFuture, paramMap) => modelFuture.map { model => // TODO: duplicate evaluator to take extra params from input @@ -142,6 +142,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St // Wait for all metrics to be calculated val metrics = metricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) + // Unpersist validation set once all metrics have been produced validationDataset.unpersist() logInfo(s"Train validation split metrics: ${metrics.toSeq}") diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index 55dac94446e4..1d1fd781f5b9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -17,23 +17,18 @@ package org.apache.spark.ml.tuning -import java.util.concurrent.ExecutorService - -import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.Path import org.json4s.{DefaultFormats, _} import org.json4s.jackson.JsonMethods._ -import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.SparkContext import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.evaluation.Evaluator -import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamPair, Params, ParamValidators} +import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} import org.apache.spark.ml.param.shared.HasSeed import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.sql.types.StructType -import org.apache.spark.util.ThreadUtils /** @@ -73,51 +68,6 @@ private[ml] trait ValidatorParams extends HasSeed with Params { /** @group getParam */ def getEvaluator: Evaluator = $(evaluator) - /** - * param to control the number of models evaluated in parallel - * Default: 1 - * - * @group expertParam - */ - val numParallelEval: IntParam = new IntParam(this, "numParallelEval", - "max number of models to evaluate in parallel, 1 for serial evaluation", - ParamValidators.gtEq(1)) - - /** @group getParam */ - def getNumParallelEval: Int = $(numParallelEval) - - /** - * Sets a factory to create an ExecutorService to be used for evaluation thread-pool - * - * @param executor instance of an ExecutorServiceFactory for using custom ExecutorService - */ - @Experimental - @InterfaceStability.Unstable - def setExecutorService(executor: ExecutorService): Unit = { - customExecutor = executor - } - - /** - * Get an ExecutorService that will be a custom one, if previously set, or a new thread-pool - * with maximum threads set to the param `numParallelEval`. If this param is set to 1, a - * same-thread executor will be used to run in serial. - */ - @Experimental - @InterfaceStability.Unstable - def getExecutorService: (ExecutorService, String) = { - (Option(customExecutor), $(numParallelEval)) match { - case (Some(executor), _) => - (executor, s"Custom executor [$executor]") - case (None, 1) => - (MoreExecutors.sameThreadExecutor(), "Same-thread executor") - case (None, n) => - (ThreadUtils.newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", n), - s"Thread-pool with $n threads") - } - } - - protected var customExecutor: ExecutorService = _ - protected def transformSchemaImpl(schema: StructType): StructType = { require($(estimatorParamMaps).nonEmpty, s"Validator requires non-empty estimatorParamMaps") val firstEstimatorParamMap = $(estimatorParamMaps).head @@ -136,8 +86,6 @@ private[ml] trait ValidatorParams extends HasSeed with Params { instrumentation.logNamedValue("evaluator", $(evaluator).getClass.getCanonicalName) instrumentation.logNamedValue("estimatorParamMapsLength", $(estimatorParamMaps).length) } - - setDefault(numParallelEval -> 1) } private[ml] object ValidatorParams { diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index 512550a7f631..e91d973d692c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -132,9 +132,9 @@ class CrossValidatorSuite .setEstimatorParamMaps(lrParamMaps) .setEvaluator(eval) .setNumFolds(2) - .setNumParallelEval(1) + .setParallelism(1) val cvSerialModel = cv.fit(dataset) - cv.setNumParallelEval(2) + cv.setParallelism(2) val cvParallelModel = cv.fit(dataset) val serialMetrics = cvSerialModel.avgMetrics.sorted diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index ec23156e2efe..7cc4737d3eb9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -133,9 +133,9 @@ class TrainValidationSplitSuite .setEstimator(lr) .setEstimatorParamMaps(lrParamMaps) .setEvaluator(eval) - .setNumParallelEval(1) + .setParallelism(1) val cvSerialModel = cv.fit(dataset) - cv.setNumParallelEval(2) + cv.setParallelism(2) val cvParallelModel = cv.fit(dataset) val serialMetrics = cvSerialModel.validationMetrics.sorted From 2c73b0bce559236bbe32f1b5237e2fb17ef5af58 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 23 Aug 2017 12:15:28 -0700 Subject: [PATCH 16/17] fixed up docs --- docs/ml-tuning.md | 4 ++-- .../scala/org/apache/spark/ml/tuning/CrossValidator.scala | 8 ++++++-- .../org/apache/spark/ml/tuning/TrainValidationSplit.scala | 7 ++++++- .../org/apache/spark/ml/tuning/ValidatorParams.scala | 1 - 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/docs/ml-tuning.md b/docs/ml-tuning.md index 83756c867eb3..64dc46cf0c0e 100644 --- a/docs/ml-tuning.md +++ b/docs/ml-tuning.md @@ -55,8 +55,8 @@ for multiclass problems. The default metric used to choose the best `ParamMap` c method in each of these evaluators. To help construct the parameter grid, users can use the [`ParamGridBuilder`](api/scala/index.html#org.apache.spark.ml.tuning.ParamGridBuilder) utility. -By default, sets of parameters from the parameter grid are evaluated in serial. Parameter evaluation can be done in parallel by setting `numParallelEval` with a value of 2 or more (a value of 1 will be serial) before running model selection with `CrossValidator` or `TrainValidationSplit` (NOTE: this is not yet supported in Python). -The value of `numParallelEval` should be chosen carefully to maximize parallelism without exceeding cluster resources, and will be capped at the number of cores in the driver system. Generally speaking, a value up to 10 should be sufficient for most clusters. +By default, sets of parameters from the parameter grid are evaluated in serial. Parameter evaluation can be done in parallel by setting `parallelism` with a value of 2 or more (a value of 1 will be serial) before running model selection with `CrossValidator` or `TrainValidationSplit` (NOTE: this is not yet supported in Python). +The value of `parallelism` should be chosen carefully to maximize parallelism without exceeding cluster resources, and larger values may not always lead to improved performance. Generally speaking, a value up to 10 should be sufficient for most clusters. # Cross-Validation diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 05d81307161b..0fd6ba27b9e1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.types.StructType import org.apache.spark.util.ThreadUtils - /** * Params for [[CrossValidator]] and [[CrossValidatorModel]]. */ @@ -96,7 +95,12 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) @Since("2.0.0") def setSeed(value: Long): this.type = set(seed, value) - /** @group expertSetParam */ + /** + * Set the mamixum level of parallelism to evaluate models in parallel. + * Default is 1 for serial evaluation + * + * @group expertSetParam + */ @Since("2.3.0") def setParallelism(value: Int): this.type = set(parallelism, value) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index c33aa0ad00cb..6ad0a0ba3feb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -91,7 +91,12 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St @Since("2.0.0") def setSeed(value: Long): this.type = set(seed, value) - /** @group expertSetParam */ + /** + * Set the mamixum level of parallelism to evaluate models in parallel. + * Default is 1 for serial evaluation + * + * @group expertSetParam + */ @Since("2.3.0") def setParallelism(value: Int): this.type = set(parallelism, value) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index 1d1fd781f5b9..d55eb14d0345 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -30,7 +30,6 @@ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.sql.types.StructType - /** * Common params for [[TrainValidationSplitParams]] and [[CrossValidatorParams]]. */ From 7a8221ba939da026a9818299cf2b897ea81766a5 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 5 Sep 2017 10:11:46 -0700 Subject: [PATCH 17/17] removed blas calculation for CrossValidator metric calc, was not necessary --- .../spark/ml/tuning/CrossValidator.scala | 18 +++++------------- .../spark/ml/tuning/TrainValidationSplit.scala | 9 ++++----- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 0fd6ba27b9e1..ce2a3a2e4041 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.concurrent.duration.Duration -import com.github.fommil.netlib.F2jBLAS import org.apache.hadoop.fs.Path import org.json4s.DefaultFormats @@ -73,8 +72,6 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) @Since("1.2.0") def this() = this(Identifiable.randomUID("cv")) - private val f2jBLAS = new F2jBLAS - /** @group setParam */ @Since("1.2.0") def setEstimator(value: Estimator[_]): this.type = set(estimator, value) @@ -112,7 +109,6 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val est = $(estimator) val eval = $(evaluator) val epm = $(estimatorParamMaps) - val numModels = epm.length // Create execution context based on $(parallelism) val executionContext = getExecutionContext @@ -129,7 +125,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) logDebug(s"Train split $splitIndex with multiple sets of parameters.") // Fit models in a Future for training in parallel - val models = epm.map { paramMap => + val modelFutures = epm.map { paramMap => Future[Model[_]] { val model = est.fit(trainingDataset, paramMap) model.asInstanceOf[Model[_]] @@ -137,12 +133,11 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) } // Unpersist training data only when all models have trained - Future.sequence[Model[_], Iterable](models)(implicitly, executionContext).onComplete { _ => - trainingDataset.unpersist() - } (executionContext) + Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext) + .onComplete { _ => trainingDataset.unpersist() } (executionContext) // Evaluate models in a Future that will calulate a metric and allow model to be cleaned up - val foldMetricFutures = models.zip(epm).map { case (modelFuture, paramMap) => + val foldMetricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) => modelFuture.map { model => // TODO: duplicate evaluator to take extra params from input val metric = eval.evaluate(model.transform(validationDataset, paramMap)) @@ -155,10 +150,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) validationDataset.unpersist() foldMetrics - }.transpose.map(_.sum) - - // Calculate average metric over all splits - f2jBLAS.dscal(numModels, 1.0 / $(numFolds), metrics, 1) + }.transpose.map(_.sum / $(numFolds)) // Calculate average metric over all splits logInfo(s"Average cross-validation metrics: ${metrics.toSeq}") val (bestMetric, bestIndex) = diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index 6ad0a0ba3feb..16db0f5f12c7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -122,7 +122,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St // Fit models in a Future for training in parallel logDebug(s"Train split with multiple sets of parameters.") - val models = epm.map { paramMap => + val modelFutures = epm.map { paramMap => Future[Model[_]] { val model = est.fit(trainingDataset, paramMap) model.asInstanceOf[Model[_]] @@ -130,12 +130,11 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St } // Unpersist training data only when all models have trained - Future.sequence[Model[_], Iterable](models)(implicitly, executionContext).onComplete { _ => - trainingDataset.unpersist() - } (executionContext) + Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext) + .onComplete { _ => trainingDataset.unpersist() } (executionContext) // Evaluate models in a Future that will calulate a metric and allow model to be cleaned up - val metricFutures = models.zip(epm).map { case (modelFuture, paramMap) => + val metricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) => modelFuture.map { model => // TODO: duplicate evaluator to take extra params from input val metric = eval.evaluate(model.transform(validationDataset, paramMap))