Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ private[ml] class FeedForwardTrainer(
* @param data RDD of input and output vector pairs
* @return model
*/
def train(data: RDD[(Vector, Vector)]): TopologyModel = {
def train(data: RDD[(Vector, Vector)]): (TopologyModel, Array[Double]) = {
val w = if (getWeights == null) {
// TODO: will make a copy if vector is a subvector of BDV (see Vectors code)
topology.model(_seed).weights
Expand All @@ -851,9 +851,14 @@ private[ml] class FeedForwardTrainer(
}
val handlePersistence = trainData.getStorageLevel == StorageLevel.NONE
if (handlePersistence) trainData.persist(StorageLevel.MEMORY_AND_DISK)
val newWeights = optimizer.optimize(trainData, w)
val (newWeights, lossHistory) = optimizer match {
case lbfgs: LBFGS => lbfgs.optimizeWithLossReturned(trainData, w)
case sgd: GradientDescent => sgd.optimizeWithLossReturned(trainData, w)
case other => throw new UnsupportedOperationException(
s"Only LBFGS and GradientDescent are supported but got ${other.getClass}.")
}
if (handlePersistence) trainData.unpersist()
topology.model(newWeights)
(topology.model(newWeights), lossHistory)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql._
import org.apache.spark.util.VersionUtils.majorMinorVersion

/** Params for Multilayer Perceptron. */
Expand Down Expand Up @@ -225,8 +225,24 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
s"The solver $solver is not supported by MultilayerPerceptronClassifier.")
}
trainer.setStackSize($(blockSize))
val mlpModel = trainer.train(data)
new MultilayerPerceptronClassificationModel(uid, mlpModel.weights)
val (mlpModel, objectiveHistory) = trainer.train(data)
createModel(dataset, mlpModel.weights, objectiveHistory)
}

private def createModel(
dataset: Dataset[_],
weights: Vector,
objectiveHistory: Array[Double]): MultilayerPerceptronClassificationModel = {
val model = copyValues(new MultilayerPerceptronClassificationModel(uid, weights))

val (summaryModel, _, predictionColName) = model.findSummaryModel()
val summary = new MultilayerPerceptronClassificationTrainingSummaryImpl(
summaryModel.transform(dataset),
predictionColName,
$(labelCol),
"",
objectiveHistory)
model.setSummary(Some(summary))
}
}

Expand Down Expand Up @@ -259,7 +275,8 @@ class MultilayerPerceptronClassificationModel private[ml] (
@Since("1.5.0") override val uid: String,
@Since("2.0.0") val weights: Vector)
extends ProbabilisticClassificationModel[Vector, MultilayerPerceptronClassificationModel]
with MultilayerPerceptronParams with Serializable with MLWritable {
with MultilayerPerceptronParams with Serializable with MLWritable
with HasTrainingSummary[MultilayerPerceptronClassificationTrainingSummary]{

@Since("1.6.0")
override lazy val numFeatures: Int = $(layers).head
Expand All @@ -268,6 +285,26 @@ class MultilayerPerceptronClassificationModel private[ml] (
.multiLayerPerceptron($(layers), softmaxOnTop = true)
.model(weights)

/**
* Gets summary of model on training set. An exception is thrown
* if `hasSummary` is false.
*/
@Since("3.1.0")
override def summary: MultilayerPerceptronClassificationTrainingSummary = super.summary

/**
* Evaluates the model on a test dataset.
*
* @param dataset Test dataset to evaluate model on.
*/
@Since("3.1.0")
def evaluate(dataset: Dataset[_]): MultilayerPerceptronClassificationSummary = {
// Handle possible missing or invalid probability or prediction columns
val (summaryModel, _, predictionColName) = findSummaryModel()
new MultilayerPerceptronClassificationSummaryImpl(summaryModel.transform(dataset),
predictionColName, $(labelCol), "")
}

/**
* Predict label for the given features.
* This internal method is used to implement `transform()` and output [[predictionCol]].
Expand Down Expand Up @@ -359,3 +396,51 @@ object MultilayerPerceptronClassificationModel
}
}
}


/**
* Abstraction for MultilayerPerceptronClassification results for a given model.
*/
sealed trait MultilayerPerceptronClassificationSummary extends ClassificationSummary

/**
* Abstraction for MultilayerPerceptronClassification training results.
*/
sealed trait MultilayerPerceptronClassificationTrainingSummary
extends MultilayerPerceptronClassificationSummary with TrainingSummary

/**
* MultilayerPerceptronClassification training results.
*
* @param predictions dataframe output by the model's `transform` method.
* @param predictionCol field in "predictions" which gives the prediction for a data instance as a
* double.
* @param labelCol field in "predictions" which gives the true label of each instance.
* @param weightCol field in "predictions" which gives the weight of each instance.
* @param objectiveHistory objective function (scaled loss + regularization) at each iteration.
*/
private class MultilayerPerceptronClassificationTrainingSummaryImpl(
predictions: DataFrame,
predictionCol: String,
labelCol: String,
weightCol: String,
override val objectiveHistory: Array[Double])
extends MultilayerPerceptronClassificationSummaryImpl(
predictions, predictionCol, labelCol, weightCol)
with MultilayerPerceptronClassificationTrainingSummary

/**
* MultilayerPerceptronClassification results for a given model.
*
* @param predictions dataframe output by the model's `transform` method.
* @param predictionCol field in "predictions" which gives the prediction for a data instance as a
* double.
* @param labelCol field in "predictions" which gives the true label of each instance.
* @param weightCol field in "predictions" which gives the weight of each instance.
*/
private class MultilayerPerceptronClassificationSummaryImpl(
@transient override val predictions: DataFrame,
override val predictionCol: String,
override val labelCol: String,
override val weightCol: String)
extends MultilayerPerceptronClassificationSummary
4 changes: 2 additions & 2 deletions mllib/src/test/scala/org/apache/spark/ml/ann/ANNSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ANNSuite extends SparkFunSuite with MLlibTestSparkContext {
val trainer = new FeedForwardTrainer(topology, 2, 1)
trainer.setWeights(initialWeights)
trainer.LBFGSOptimizer.setNumIterations(20)
val model = trainer.train(rddData)
val (model, _) = trainer.train(rddData)
val predictionAndLabels = rddData.map { case (input, label) =>
(model.predict(input)(0), label(0))
}.collect()
Expand Down Expand Up @@ -80,7 +80,7 @@ class ANNSuite extends SparkFunSuite with MLlibTestSparkContext {
// TODO: add a test for SGD
trainer.LBFGSOptimizer.setConvergenceTol(1e-4).setNumIterations(20)
trainer.setWeights(initialWeights).setStackSize(1)
val model = trainer.train(rddData)
val (model, _) = trainer.train(rddData)
val predictionAndLabels = rddData.map { case (input, label) =>
(model.predict(input), label)
}.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,36 @@ class MultilayerPerceptronClassifierSuite extends MLTest with DefaultReadWriteTe
val sparkVersionStr = metadata.select("sparkVersion").first().getString(0)
assert(sparkVersionStr == "2.4.4")
}

test("summary and training summary") {
val mlp = new MultilayerPerceptronClassifier()
val model = mlp.setMaxIter(5).setLayers(Array(2, 3, 2)).fit(dataset)
val summary = model.evaluate(dataset)

assert(model.summary.truePositiveRateByLabel === summary.truePositiveRateByLabel)
assert(model.summary.falsePositiveRateByLabel === summary.falsePositiveRateByLabel)
assert(model.summary.precisionByLabel === summary.precisionByLabel)
assert(model.summary.recallByLabel === summary.recallByLabel)
assert(model.summary.fMeasureByLabel === summary.fMeasureByLabel)
assert(model.summary.accuracy === summary.accuracy)
assert(model.summary.weightedFalsePositiveRate === summary.weightedFalsePositiveRate)
assert(model.summary.weightedTruePositiveRate === summary.weightedTruePositiveRate)
assert(model.summary.weightedPrecision === summary.weightedPrecision)
assert(model.summary.weightedRecall === summary.weightedRecall)
assert(model.summary.weightedFMeasure === summary.weightedFMeasure)
}

test("MultilayerPerceptron training summary totalIterations") {
Seq(1, 5, 10, 20, 100).foreach { maxIter =>
val trainer = new MultilayerPerceptronClassifier()
.setMaxIter(maxIter)
.setLayers(Array(2, 3, 2))
val model = trainer.fit(dataset)
if (maxIter == 1) {
assert(model.summary.totalIterations === maxIter)
} else {
assert(model.summary.totalIterations <= maxIter)
}
}
}
}
2 changes: 2 additions & 0 deletions python/docs/source/reference/pyspark.ml.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ Classification
NaiveBayesModel
MultilayerPerceptronClassifier
MultilayerPerceptronClassificationModel
MultilayerPerceptronClassificationSummary
MultilayerPerceptronClassificationTrainingSummary
OneVsRest
OneVsRestModel
FMClassifier
Expand Down
49 changes: 48 additions & 1 deletion python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
'BinaryRandomForestClassificationTrainingSummary',
'NaiveBayes', 'NaiveBayesModel',
'MultilayerPerceptronClassifier', 'MultilayerPerceptronClassificationModel',
'MultilayerPerceptronClassificationSummary',
'MultilayerPerceptronClassificationTrainingSummary',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao, are these going to be documented? Might be good to list at pyspark.ml.rst too if so. The documentation PR (#29188) was merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon updated pyspark.ml.rst. Thanks!

'OneVsRest', 'OneVsRestModel',
'FMClassifier', 'FMClassificationModel', 'FMClassificationSummary',
'FMClassificationTrainingSummary']
Expand Down Expand Up @@ -2622,7 +2624,7 @@ def setSolver(self, value):

class MultilayerPerceptronClassificationModel(_JavaProbabilisticClassificationModel,
_MultilayerPerceptronParams, JavaMLWritable,
JavaMLReadable):
JavaMLReadable, HasTrainingSummary):
"""
Model fitted by MultilayerPerceptronClassifier.

Expand All @@ -2637,6 +2639,51 @@ def weights(self):
"""
return self._call_java("weights")

@since("3.1.0")
def summary(self):
"""
Gets summary (e.g. accuracy/precision/recall, objective history, total iterations) of model
trained on the training set. An exception is thrown if `trainingSummary is None`.
"""
if self.hasSummary:
return MultilayerPerceptronClassificationTrainingSummary(
super(MultilayerPerceptronClassificationModel, self).summary)
else:
raise RuntimeError("No training summary available for this %s" %
self.__class__.__name__)

@since("3.1.0")
def evaluate(self, dataset):
"""
Evaluates the model on a test dataset.

:param dataset:
Test dataset to evaluate model on, where dataset is an
instance of :py:class:`pyspark.sql.DataFrame`
"""
if not isinstance(dataset, DataFrame):
raise ValueError("dataset must be a DataFrame but got %s." % type(dataset))
java_mlp_summary = self._call_java("evaluate", dataset)
return MultilayerPerceptronClassificationSummary(java_mlp_summary)


class MultilayerPerceptronClassificationSummary(_ClassificationSummary):
"""
Abstraction for MultilayerPerceptronClassifier Results for a given model.
.. versionadded:: 3.1.0
"""
pass


@inherit_doc
class MultilayerPerceptronClassificationTrainingSummary(MultilayerPerceptronClassificationSummary,
_TrainingSummary):
"""
Abstraction for MultilayerPerceptronClassifier Training results.
.. versionadded:: 3.1.0
"""
pass


class _OneVsRestParams(_ClassifierParams, HasWeightCol):
"""
Expand Down
45 changes: 41 additions & 4 deletions python/pyspark/ml/tests/test_training_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import sys
import unittest

from pyspark.ml.classification import BinaryLogisticRegressionSummary, FMClassifier, \
from pyspark.ml.classification import BinaryLogisticRegressionSummary, \
BinaryRandomForestClassificationSummary, FMClassifier, \
FMClassificationSummary, LinearSVC, LinearSVCSummary, \
BinaryRandomForestClassificationSummary, LogisticRegression, \
LogisticRegressionSummary, RandomForestClassificationSummary, \
RandomForestClassifier
LogisticRegression, LogisticRegressionSummary, \
MultilayerPerceptronClassifier, MultilayerPerceptronClassificationSummary, \
RandomForestClassificationSummary, RandomForestClassifier
from pyspark.ml.clustering import BisectingKMeans, GaussianMixture, KMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression
Expand Down Expand Up @@ -354,6 +355,42 @@ def test_fm_classification_summary(self):
self.assertTrue(isinstance(sameSummary, FMClassificationSummary))
self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC)

def test_mlp_classification_summary(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense([0.0, 0.0])),
(1.0, Vectors.dense([0.0, 1.0])),
(1.0, Vectors.dense([1.0, 0.0])),
(0.0, Vectors.dense([1.0, 1.0]))
],
["label", "features"])
mlp = MultilayerPerceptronClassifier(layers=[2, 2, 2], seed=123)
model = mlp.fit(df)
self.assertTrue(model.hasSummary)
s = model.summary()
# test that api is callable and returns expected types
self.assertTrue(isinstance(s.predictions, DataFrame))
self.assertEqual(s.labelCol, "label")
self.assertEqual(s.predictionCol, "prediction")
self.assertGreater(s.totalIterations, 0)
self.assertTrue(isinstance(s.labels, list))
self.assertTrue(isinstance(s.truePositiveRateByLabel, list))
self.assertTrue(isinstance(s.falsePositiveRateByLabel, list))
self.assertTrue(isinstance(s.precisionByLabel, list))
self.assertTrue(isinstance(s.recallByLabel, list))
self.assertTrue(isinstance(s.fMeasureByLabel(), list))
self.assertTrue(isinstance(s.fMeasureByLabel(1.0), list))
self.assertAlmostEqual(s.accuracy, 1.0, 2)
self.assertAlmostEqual(s.weightedTruePositiveRate, 1.0, 2)
self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.0, 2)
self.assertAlmostEqual(s.weightedRecall, 1.0, 2)
self.assertAlmostEqual(s.weightedPrecision, 1.0, 2)
self.assertAlmostEqual(s.weightedFMeasure(), 1.0, 2)
self.assertAlmostEqual(s.weightedFMeasure(1.0), 1.0, 2)
# test evaluation (with training dataset) produces a summary with same values
# one check is enough to verify a summary is returned, Scala version runs full test
sameSummary = model.evaluate(df)
self.assertTrue(isinstance(sameSummary, MultilayerPerceptronClassificationSummary))
self.assertAlmostEqual(sameSummary.accuracy, s.accuracy)

def test_gaussian_mixture_summary(self):
data = [(Vectors.dense(1.0),), (Vectors.dense(5.0),), (Vectors.dense(10.0),),
(Vectors.sparse(1, [], []),)]
Expand Down