diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala index 87d96dd51eb94..40a3625d36acd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaCrossValidationExample.scala @@ -113,6 +113,7 @@ object ModelSelectionViaCrossValidationExample { .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") } + cvModel.tuningSummary.show() // $example off$ spark.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala index 71e41e7298c73..39ccad7d4d86a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/ModelSelectionViaTrainValidationSplitExample.scala @@ -76,6 +76,7 @@ object ModelSelectionViaTrainValidationSplitExample { model.transform(test) .select("features", "label", "prediction") .show() + model.tuningSummary.show() // $example off$ spark.stop() diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index e60a14f976a5c..abdc59d6a06df 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -309,6 +309,13 @@ class CrossValidatorModel private[ml] ( bestModel.transformSchema(schema) } + /** + * Summary of grid search tuning in the format of DataFrame. Each row contains one candidate + * paramMap and the corresponding metric of trained model. + */ + @Since("2.3.0") + lazy val tuningSummary: DataFrame = this.getTuningSummaryDF(avgMetrics) + @Since("1.4.0") override def copy(extra: ParamMap): CrossValidatorModel = { val copied = new CrossValidatorModel( diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index 8b251197afbef..40f776a1e7a60 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -300,6 +300,13 @@ class TrainValidationSplitModel private[ml] ( bestModel.transformSchema(schema) } + /** + * Summary of grid search tuning in the format of DataFrame. Each row contains one candidate + * paramMap and the corresponding metric of trained model. + */ + @Since("2.3.0") + lazy val tuningSummary: DataFrame = this.getTuningSummaryDF(validationMetrics) + @Since("1.5.0") override def copy(extra: ParamMap): TrainValidationSplitModel = { val copied = new TrainValidationSplitModel ( diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala index 135828815504a..971499235e7f7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/ValidatorParams.scala @@ -22,13 +22,14 @@ import org.json4s.{DefaultFormats, _} import org.json4s.jackson.JsonMethods._ import org.apache.spark.SparkContext -import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.evaluation.Evaluator +import org.apache.spark.ml.{Estimator, Model, Pipeline} +import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, MulticlassClassificationEvaluator, RegressionEvaluator} import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} import org.apache.spark.ml.param.shared.HasSeed import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} /** * Common params for [[TrainValidationSplitParams]] and [[CrossValidatorParams]]. @@ -85,6 +86,54 @@ private[ml] trait ValidatorParams extends HasSeed with Params { instrumentation.logNamedValue("evaluator", $(evaluator).getClass.getCanonicalName) instrumentation.logNamedValue("estimatorParamMapsLength", $(estimatorParamMaps).length) } + + + /** + * @return Summary of grid search tuning in the format of DataFrame. Each row contains one + * candidate paramMap and the corresponding metric of trained model. + */ + protected def getTuningSummaryDF(metrics: Array[Double]): DataFrame = { + val paramMaps = $(estimatorParamMaps) + require(paramMaps.nonEmpty, "estimator param maps should not be empty") + require(paramMaps.length == metrics.length, "estimator param maps number should match metrics") + val metricName = $(evaluator) match { + case b: BinaryClassificationEvaluator => b.getMetricName + case m: MulticlassClassificationEvaluator => m.getMetricName + case r: RegressionEvaluator => r.getMetricName + case _ => "metrics" + } + val spark = SparkSession.builder().getOrCreate() + val sc = spark.sparkContext + + // collect related params since paramMaps does not necessarily contain the same set of params. + val tuningParamPairs = paramMaps.flatMap(map => map.toSeq) + val tuningParams = tuningParamPairs.map(_.param.asInstanceOf[Param[Any]]).distinct + val schema = new StructType(tuningParams.map(p => StructField(p.toString, StringType)) + ++ Array(StructField(metricName, DoubleType))) + + // get param values in paramMap, as well as the default values if not in paramMap. + val rows = paramMaps.zip(metrics).map { case (pMap, metric) => + val est = $(estimator).copy(pMap) + val values = tuningParams.map { param => + est match { + // get param value in stages if est is a Pipeline. + case pipeline: Pipeline => + val candidates = pipeline.getStages.flatMap { stage => + stage.extractParamMap().get(param) + } + if(candidates.nonEmpty) { + param.jsonEncode(candidates.head) + } else { + param.jsonEncode(est.getOrDefault(param)) + } + case _ => + param.jsonEncode(est.getOrDefault(param)) + } + } ++ Seq(metric) + Row.fromSeq(values) + } + spark.createDataFrame(sc.parallelize(rows), schema) + } } private[ml] object ValidatorParams { diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index e6ee7220d2279..c313134044e37 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -24,14 +24,14 @@ import org.apache.spark.ml.{Estimator, Model, Pipeline} import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, OneVsRest} import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, MulticlassClassificationEvaluator, RegressionEvaluator} -import org.apache.spark.ml.feature.HashingTF +import org.apache.spark.ml.feature.{HashingTF, MinMaxScaler} import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared.HasInputCol import org.apache.spark.ml.regression.LinearRegression -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} -import org.apache.spark.mllib.util.LinearDataGenerator -import org.apache.spark.sql.Dataset +import org.apache.spark.ml.util.{DefaultReadWriteTest, Identifiable, MLTest, MLTestingUtils} +import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext} +import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.types.StructType class CrossValidatorSuite @@ -75,6 +75,73 @@ class CrossValidatorSuite } } + test("cross validation with tuning summary") { + val lr = new LogisticRegression + val lrParamMaps: Array[ParamMap] = new ParamGridBuilder() + .addGrid(lr.regParam, Array(0.001, 1.0, 1000.0)) + .addGrid(lr.maxIter, Array(0, 2)) + .build() + val eval = new BinaryClassificationEvaluator + val cv = new CrossValidator() + .setEstimator(lr) + .setEstimatorParamMaps(lrParamMaps) + .setEvaluator(eval) + .setNumFolds(3) + val cvModel = cv.fit(dataset) + val expected = lrParamMaps.zip(cvModel.avgMetrics).map { case (map, metric) => + Row.fromSeq(map.toSeq.map(_.value.toString) ++ Seq(metric)) + } + assert(cvModel.tuningSummary.collect().toSet === expected.toSet) + assert(cvModel.tuningSummary.columns.last === eval.getMetricName) + } + + test("CrossValidation tuningSummary with Pipeline") { + val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF() + val scalar = new MinMaxScaler().setInputCol("features").setOutputCol("scaled") + val lr = new LogisticRegression().setFeaturesCol("scaled") + val pipeline = new Pipeline().setStages(Array(scalar, lr)) + + val lrParamMaps = new ParamGridBuilder() + .addGrid(lr.regParam, Array(0.001, 1.0)) + .addGrid(lr.maxIter, Array(0, 2)) + .addGrid(scalar.min, Array(0.0, -1.0)) + .build() + val eval = new BinaryClassificationEvaluator + val cv = new CrossValidator() + .setEstimator(pipeline) + .setEstimatorParamMaps(lrParamMaps) + .setEvaluator(eval) + .setNumFolds(3) + val cvModel = cv.fit(dataset) + val expected = lrParamMaps.zip(cvModel.avgMetrics).map { case (map, metric) => + Row.fromSeq(map.toSeq.map(_.value.toString) ++ Seq(metric)) + } + assert(cvModel.tuningSummary.collect().toSet === expected.toSet) + assert(cvModel.tuningSummary.columns.last === eval.getMetricName) + cvModel.tuningSummary.show() + } + + test("CV tuningSummary with non-overlapping params") { + val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF() + val lr = new LogisticRegression + val lrParamMaps = Array(new ParamMap().put(lr.maxIter, 0), + new ParamMap().put(lr.regParam, 0.01)) + val eval = new BinaryClassificationEvaluator + val cv = new CrossValidator() + .setEstimator(lr) + .setEstimatorParamMaps(lrParamMaps) + .setEvaluator(eval) + .setNumFolds(3) + val cvModel = cv.fit(dataset) + val expected = Seq((0, 0.0, cvModel.avgMetrics(0)), + (lr.getOrDefault(lr.maxIter), 0.01, cvModel.avgMetrics(1)) + ).map(t => (t._1.toString, t._2.toString, t._3)) + .toDF(lr.maxIter.name, lr.regParam.name, eval.getMetricName) + + assert(cvModel.tuningSummary.collect().toSet === expected.collect().toSet) + assert(cvModel.tuningSummary.columns.last === eval.getMetricName) + } + test("cross validation with linear regression") { val dataset = sc.parallelize( LinearDataGenerator.generateLinearInput( diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index cd76acf9c67bc..a55b8e0040be7 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -20,17 +20,18 @@ package org.apache.spark.ml.tuning import java.io.File import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.{Estimator, Model, Pipeline} import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, OneVsRest} import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, RegressionEvaluator} +import org.apache.spark.ml.feature.MinMaxScaler import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared.HasInputCol import org.apache.spark.ml.regression.LinearRegression -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} -import org.apache.spark.mllib.util.LinearDataGenerator -import org.apache.spark.sql.Dataset +import org.apache.spark.ml.util.{DefaultReadWriteTest, Identifiable, MLTest, MLTestingUtils} +import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext} +import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.types.StructType class TrainValidationSplitSuite @@ -73,6 +74,68 @@ class TrainValidationSplitSuite } } + test("train validation split tuningSummary") { + val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF() + val lr = new LogisticRegression + val lrParamMaps = new ParamGridBuilder() + .addGrid(lr.regParam, Array(0.001, 1.0, 1000.0)) + .addGrid(lr.maxIter, Array(0, 2)) + .build() + val eval = new BinaryClassificationEvaluator + val tvs = new TrainValidationSplit() + .setEstimator(lr) + .setEstimatorParamMaps(lrParamMaps) + .setEvaluator(eval) + val tvsModel = tvs.fit(dataset) + val expected = lrParamMaps.zip(tvsModel.validationMetrics).map { case (map, metric) => + Row.fromSeq(map.toSeq.map(_.value.toString) ++ Seq(metric)) + } + assert(tvsModel.tuningSummary.collect().toSet === expected.toSet) + assert(tvsModel.tuningSummary.columns.last === eval.getMetricName) + } + + test("tuningSummary with non-overlapping params") { + val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF() + val lr = new LogisticRegression + val lrParamMaps = Array(new ParamMap().put(lr.maxIter, 0), + new ParamMap().put(lr.regParam, 0.01)) + val eval = new BinaryClassificationEvaluator + val tvs = new TrainValidationSplit() + .setEstimator(lr) + .setEstimatorParamMaps(lrParamMaps) + .setEvaluator(eval) + val tvsModel = tvs.fit(dataset) + val expected = Seq((0, lr.getOrDefault(lr.regParam), tvsModel.validationMetrics(0)), + (lr.getOrDefault(lr.maxIter), 0.01, tvsModel.validationMetrics(1)) + ).map(t => (t._1.toString, t._2.toString, t._3)) + .toDF(lr.maxIter.name, lr.regParam.name, eval.getMetricName) + + assert(tvsModel.tuningSummary.collect().toSet === expected.collect().toSet) + assert(tvsModel.tuningSummary.columns.last === eval.getMetricName) + } + + test("train validation split tuningSummary with Pipeline") { + val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF() + val scalar = new MinMaxScaler().setInputCol("features").setOutputCol("scaled") + val lr = new LogisticRegression().setFeaturesCol("scaled") + val pipeline = new Pipeline().setStages(Array(scalar, lr)) + + val lrParamMaps = Array(new ParamMap().put(lr.maxIter, 0), + new ParamMap().put(scalar.min, -1.0)) + val eval = new BinaryClassificationEvaluator + val tvs = new TrainValidationSplit() + .setEstimator(pipeline) + .setEstimatorParamMaps(lrParamMaps) + .setEvaluator(eval) + val tvsModel = tvs.fit(dataset) + val expected = Seq((0, scalar.getOrDefault(scalar.min), tvsModel.validationMetrics(0)), + (lr.getOrDefault(lr.maxIter), -1.0, tvsModel.validationMetrics(1)) + ).map(t => (t._1.toString, t._2.toString, t._3)) + .toDF(lr.maxIter.name, scalar.min.name, eval.getMetricName) + assert(tvsModel.tuningSummary.collect().toSet === expected.collect().toSet) + assert(tvsModel.tuningSummary.columns.last === eval.getMetricName) + } + test("train validation with linear regression") { val dataset = sc.parallelize( LinearDataGenerator.generateLinearInput( @@ -100,7 +163,7 @@ class TrainValidationSplitSuite assert(parent.getMaxIter === 10) assert(tvsModel.validationMetrics.length === lrParamMaps.length) - eval.setMetricName("r2") + eval.setMetricName("r2") val tvsModel2 = tvs.fit(dataset) val parent2 = tvsModel2.bestModel.parent.asInstanceOf[LinearRegression] assert(parent2.getRegParam === 0.001)