diff --git a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala index 2b4b0fc55b955..253d4083de7d4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala @@ -838,7 +838,7 @@ private[ml] class FeedForwardTrainer( * @param data RDD of input and output vector pairs * @return model */ - def train(data: RDD[(Vector, Vector)]): TopologyModel = { + def train(data: RDD[(Vector, Vector)]): (TopologyModel, Array[Double]) = { val w = if (getWeights == null) { // TODO: will make a copy if vector is a subvector of BDV (see Vectors code) topology.model(_seed).weights @@ -851,9 +851,14 @@ private[ml] class FeedForwardTrainer( } val handlePersistence = trainData.getStorageLevel == StorageLevel.NONE if (handlePersistence) trainData.persist(StorageLevel.MEMORY_AND_DISK) - val newWeights = optimizer.optimize(trainData, w) + val (newWeights, lossHistory) = optimizer match { + case lbfgs: LBFGS => lbfgs.optimizeWithLossReturned(trainData, w) + case sgd: GradientDescent => sgd.optimizeWithLossReturned(trainData, w) + case other => throw new UnsupportedOperationException( + s"Only LBFGS and GradientDescent are supported but got ${other.getClass}.") + } if (handlePersistence) trainData.unpersist() - topology.model(newWeights) + (topology.model(newWeights), lossHistory) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index 6bffc372b68fe..e267ee72f77d9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -27,7 +27,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.ml.util.Instrumentation.instrumented -import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql._ import org.apache.spark.util.VersionUtils.majorMinorVersion /** Params for Multilayer Perceptron. */ @@ -225,8 +225,24 @@ class MultilayerPerceptronClassifier @Since("1.5.0") ( s"The solver $solver is not supported by MultilayerPerceptronClassifier.") } trainer.setStackSize($(blockSize)) - val mlpModel = trainer.train(data) - new MultilayerPerceptronClassificationModel(uid, mlpModel.weights) + val (mlpModel, objectiveHistory) = trainer.train(data) + createModel(dataset, mlpModel.weights, objectiveHistory) + } + + private def createModel( + dataset: Dataset[_], + weights: Vector, + objectiveHistory: Array[Double]): MultilayerPerceptronClassificationModel = { + val model = copyValues(new MultilayerPerceptronClassificationModel(uid, weights)) + + val (summaryModel, _, predictionColName) = model.findSummaryModel() + val summary = new MultilayerPerceptronClassificationTrainingSummaryImpl( + summaryModel.transform(dataset), + predictionColName, + $(labelCol), + "", + objectiveHistory) + model.setSummary(Some(summary)) } } @@ -259,7 +275,8 @@ class MultilayerPerceptronClassificationModel private[ml] ( @Since("1.5.0") override val uid: String, @Since("2.0.0") val weights: Vector) extends ProbabilisticClassificationModel[Vector, MultilayerPerceptronClassificationModel] - with MultilayerPerceptronParams with Serializable with MLWritable { + with MultilayerPerceptronParams with Serializable with MLWritable + with HasTrainingSummary[MultilayerPerceptronClassificationTrainingSummary]{ @Since("1.6.0") override lazy val numFeatures: Int = $(layers).head @@ -268,6 +285,26 @@ class MultilayerPerceptronClassificationModel private[ml] ( .multiLayerPerceptron($(layers), softmaxOnTop = true) .model(weights) + /** + * Gets summary of model on training set. An exception is thrown + * if `hasSummary` is false. + */ + @Since("3.1.0") + override def summary: MultilayerPerceptronClassificationTrainingSummary = super.summary + + /** + * Evaluates the model on a test dataset. + * + * @param dataset Test dataset to evaluate model on. + */ + @Since("3.1.0") + def evaluate(dataset: Dataset[_]): MultilayerPerceptronClassificationSummary = { + // Handle possible missing or invalid probability or prediction columns + val (summaryModel, _, predictionColName) = findSummaryModel() + new MultilayerPerceptronClassificationSummaryImpl(summaryModel.transform(dataset), + predictionColName, $(labelCol), "") + } + /** * Predict label for the given features. * This internal method is used to implement `transform()` and output [[predictionCol]]. @@ -359,3 +396,51 @@ object MultilayerPerceptronClassificationModel } } } + + +/** + * Abstraction for MultilayerPerceptronClassification results for a given model. + */ +sealed trait MultilayerPerceptronClassificationSummary extends ClassificationSummary + +/** + * Abstraction for MultilayerPerceptronClassification training results. + */ +sealed trait MultilayerPerceptronClassificationTrainingSummary + extends MultilayerPerceptronClassificationSummary with TrainingSummary + +/** + * MultilayerPerceptronClassification training results. + * + * @param predictions dataframe output by the model's `transform` method. + * @param predictionCol field in "predictions" which gives the prediction for a data instance as a + * double. + * @param labelCol field in "predictions" which gives the true label of each instance. + * @param weightCol field in "predictions" which gives the weight of each instance. + * @param objectiveHistory objective function (scaled loss + regularization) at each iteration. + */ +private class MultilayerPerceptronClassificationTrainingSummaryImpl( + predictions: DataFrame, + predictionCol: String, + labelCol: String, + weightCol: String, + override val objectiveHistory: Array[Double]) + extends MultilayerPerceptronClassificationSummaryImpl( + predictions, predictionCol, labelCol, weightCol) + with MultilayerPerceptronClassificationTrainingSummary + +/** + * MultilayerPerceptronClassification results for a given model. + * + * @param predictions dataframe output by the model's `transform` method. + * @param predictionCol field in "predictions" which gives the prediction for a data instance as a + * double. + * @param labelCol field in "predictions" which gives the true label of each instance. + * @param weightCol field in "predictions" which gives the weight of each instance. + */ +private class MultilayerPerceptronClassificationSummaryImpl( + @transient override val predictions: DataFrame, + override val predictionCol: String, + override val labelCol: String, + override val weightCol: String) + extends MultilayerPerceptronClassificationSummary diff --git a/mllib/src/test/scala/org/apache/spark/ml/ann/ANNSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/ann/ANNSuite.scala index 35586320cb82b..fdd6e352fa639 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/ann/ANNSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/ann/ANNSuite.scala @@ -45,7 +45,7 @@ class ANNSuite extends SparkFunSuite with MLlibTestSparkContext { val trainer = new FeedForwardTrainer(topology, 2, 1) trainer.setWeights(initialWeights) trainer.LBFGSOptimizer.setNumIterations(20) - val model = trainer.train(rddData) + val (model, _) = trainer.train(rddData) val predictionAndLabels = rddData.map { case (input, label) => (model.predict(input)(0), label(0)) }.collect() @@ -80,7 +80,7 @@ class ANNSuite extends SparkFunSuite with MLlibTestSparkContext { // TODO: add a test for SGD trainer.LBFGSOptimizer.setConvergenceTol(1e-4).setNumIterations(20) trainer.setWeights(initialWeights).setStackSize(1) - val model = trainer.train(rddData) + val (model, _) = trainer.train(rddData) val predictionAndLabels = rddData.map { case (input, label) => (model.predict(input), label) }.collect() diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala index 902af71e42f86..c909e72c689bc 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifierSuite.scala @@ -242,4 +242,36 @@ class MultilayerPerceptronClassifierSuite extends MLTest with DefaultReadWriteTe val sparkVersionStr = metadata.select("sparkVersion").first().getString(0) assert(sparkVersionStr == "2.4.4") } + + test("summary and training summary") { + val mlp = new MultilayerPerceptronClassifier() + val model = mlp.setMaxIter(5).setLayers(Array(2, 3, 2)).fit(dataset) + val summary = model.evaluate(dataset) + + assert(model.summary.truePositiveRateByLabel === summary.truePositiveRateByLabel) + assert(model.summary.falsePositiveRateByLabel === summary.falsePositiveRateByLabel) + assert(model.summary.precisionByLabel === summary.precisionByLabel) + assert(model.summary.recallByLabel === summary.recallByLabel) + assert(model.summary.fMeasureByLabel === summary.fMeasureByLabel) + assert(model.summary.accuracy === summary.accuracy) + assert(model.summary.weightedFalsePositiveRate === summary.weightedFalsePositiveRate) + assert(model.summary.weightedTruePositiveRate === summary.weightedTruePositiveRate) + assert(model.summary.weightedPrecision === summary.weightedPrecision) + assert(model.summary.weightedRecall === summary.weightedRecall) + assert(model.summary.weightedFMeasure === summary.weightedFMeasure) + } + + test("MultilayerPerceptron training summary totalIterations") { + Seq(1, 5, 10, 20, 100).foreach { maxIter => + val trainer = new MultilayerPerceptronClassifier() + .setMaxIter(maxIter) + .setLayers(Array(2, 3, 2)) + val model = trainer.fit(dataset) + if (maxIter == 1) { + assert(model.summary.totalIterations === maxIter) + } else { + assert(model.summary.totalIterations <= maxIter) + } + } + } } diff --git a/python/docs/source/reference/pyspark.ml.rst b/python/docs/source/reference/pyspark.ml.rst index b6e7d10276603..00ee7b9078329 100644 --- a/python/docs/source/reference/pyspark.ml.rst +++ b/python/docs/source/reference/pyspark.ml.rst @@ -153,6 +153,8 @@ Classification NaiveBayesModel MultilayerPerceptronClassifier MultilayerPerceptronClassificationModel + MultilayerPerceptronClassificationSummary + MultilayerPerceptronClassificationTrainingSummary OneVsRest OneVsRestModel FMClassifier diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index e192e8c252d50..3bc9dc0628aa8 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -51,6 +51,8 @@ 'BinaryRandomForestClassificationTrainingSummary', 'NaiveBayes', 'NaiveBayesModel', 'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel', + 'MultilayerPerceptronClassificationSummary', + 'MultilayerPerceptronClassificationTrainingSummary', 'OneVsRest', 'OneVsRestModel', 'FMClassifier', 'FMClassificationModel', 'FMClassificationSummary', 'FMClassificationTrainingSummary'] @@ -2622,7 +2624,7 @@ def setSolver(self, value): class MultilayerPerceptronClassificationModel(_JavaProbabilisticClassificationModel, _MultilayerPerceptronParams, JavaMLWritable, - JavaMLReadable): + JavaMLReadable, HasTrainingSummary): """ Model fitted by MultilayerPerceptronClassifier. @@ -2637,6 +2639,51 @@ def weights(self): """ return self._call_java("weights") + @since("3.1.0") + def summary(self): + """ + Gets summary (e.g. accuracy/precision/recall, objective history, total iterations) of model + trained on the training set. An exception is thrown if `trainingSummary is None`. + """ + if self.hasSummary: + return MultilayerPerceptronClassificationTrainingSummary( + super(MultilayerPerceptronClassificationModel, self).summary) + else: + raise RuntimeError("No training summary available for this %s" % + self.__class__.__name__) + + @since("3.1.0") + def evaluate(self, dataset): + """ + Evaluates the model on a test dataset. + + :param dataset: + Test dataset to evaluate model on, where dataset is an + instance of :py:class:`pyspark.sql.DataFrame` + """ + if not isinstance(dataset, DataFrame): + raise ValueError("dataset must be a DataFrame but got %s." % type(dataset)) + java_mlp_summary = self._call_java("evaluate", dataset) + return MultilayerPerceptronClassificationSummary(java_mlp_summary) + + +class MultilayerPerceptronClassificationSummary(_ClassificationSummary): + """ + Abstraction for MultilayerPerceptronClassifier Results for a given model. + .. versionadded:: 3.1.0 + """ + pass + + +@inherit_doc +class MultilayerPerceptronClassificationTrainingSummary(MultilayerPerceptronClassificationSummary, + _TrainingSummary): + """ + Abstraction for MultilayerPerceptronClassifier Training results. + .. versionadded:: 3.1.0 + """ + pass + class _OneVsRestParams(_ClassifierParams, HasWeightCol): """ diff --git a/python/pyspark/ml/tests/test_training_summary.py b/python/pyspark/ml/tests/test_training_summary.py index d305be8b96cd4..6b05ffaa7d52f 100644 --- a/python/pyspark/ml/tests/test_training_summary.py +++ b/python/pyspark/ml/tests/test_training_summary.py @@ -18,11 +18,12 @@ import sys import unittest -from pyspark.ml.classification import BinaryLogisticRegressionSummary, FMClassifier, \ +from pyspark.ml.classification import BinaryLogisticRegressionSummary, \ + BinaryRandomForestClassificationSummary, FMClassifier, \ FMClassificationSummary, LinearSVC, LinearSVCSummary, \ - BinaryRandomForestClassificationSummary, LogisticRegression, \ - LogisticRegressionSummary, RandomForestClassificationSummary, \ - RandomForestClassifier + LogisticRegression, LogisticRegressionSummary, \ + MultilayerPerceptronClassifier, MultilayerPerceptronClassificationSummary, \ + RandomForestClassificationSummary, RandomForestClassifier from pyspark.ml.clustering import BisectingKMeans, GaussianMixture, KMeans from pyspark.ml.linalg import Vectors from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression @@ -354,6 +355,42 @@ def test_fm_classification_summary(self): self.assertTrue(isinstance(sameSummary, FMClassificationSummary)) self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC) + def test_mlp_classification_summary(self): + df = self.spark.createDataFrame([(0.0, Vectors.dense([0.0, 0.0])), + (1.0, Vectors.dense([0.0, 1.0])), + (1.0, Vectors.dense([1.0, 0.0])), + (0.0, Vectors.dense([1.0, 1.0])) + ], + ["label", "features"]) + mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123) + model = mlp.fit(df) + self.assertTrue(model.hasSummary) + s = model.summary() + # test that api is callable and returns expected types + self.assertTrue(isinstance(s.predictions, DataFrame)) + self.assertEqual(s.labelCol, "label") + self.assertEqual(s.predictionCol, "prediction") + self.assertGreater(s.totalIterations, 0) + self.assertTrue(isinstance(s.labels, list)) + self.assertTrue(isinstance(s.truePositiveRateByLabel, list)) + self.assertTrue(isinstance(s.falsePositiveRateByLabel, list)) + self.assertTrue(isinstance(s.precisionByLabel, list)) + self.assertTrue(isinstance(s.recallByLabel, list)) + self.assertTrue(isinstance(s.fMeasureByLabel(), list)) + self.assertTrue(isinstance(s.fMeasureByLabel(1.0), list)) + self.assertAlmostEqual(s.accuracy, 1.0, 2) + self.assertAlmostEqual(s.weightedTruePositiveRate, 1.0, 2) + self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.0, 2) + self.assertAlmostEqual(s.weightedRecall, 1.0, 2) + self.assertAlmostEqual(s.weightedPrecision, 1.0, 2) + self.assertAlmostEqual(s.weightedFMeasure(), 1.0, 2) + self.assertAlmostEqual(s.weightedFMeasure(1.0), 1.0, 2) + # test evaluation (with training dataset) produces a summary with same values + # one check is enough to verify a summary is returned, Scala version runs full test + sameSummary = model.evaluate(df) + self.assertTrue(isinstance(sameSummary, MultilayerPerceptronClassificationSummary)) + self.assertAlmostEqual(sameSummary.accuracy, s.accuracy) + def test_gaussian_mixture_summary(self): data = [(Vectors.dense(1.0),), (Vectors.dense(5.0),), (Vectors.dense(10.0),), (Vectors.sparse(1, [], []),)]