From 2f79bb2d5c7e29e85a4a7abe63254d392a49fe53 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 16 Feb 2018 17:03:09 +0100 Subject: [PATCH 1/6] [SPARK-23451][ML] Deprecate KMeans.computeCost --- .../main/scala/org/apache/spark/ml/clustering/KMeans.scala | 6 +++++- python/pyspark/ml/clustering.py | 6 ++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index c8145de564cb..c9e180cc923e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -149,8 +149,12 @@ class KMeansModel private[ml] ( /** * Return the K-means cost (sum of squared distances of points to their nearest center) for this * model on the given data. + * + * @deprecated This method is deprecated and will be removed in 3.0.0. Use ClusteringEvaluator + * instead. */ - // TODO: Replace the temp fix when we have proper evaluators defined for clustering. + @deprecated("This method is deprecated and will be removed in 3.0.0." + + " Use ClusteringEvaluator instead.", "2.4.0") @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 6448b76a0da8..ab9b65278114 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -15,6 +15,8 @@ # limitations under the License. # +import warnings + from pyspark import since, keyword_only from pyspark.ml.util import * from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper @@ -320,7 +322,11 @@ def computeCost(self, dataset): """ Return the K-means cost (sum of squared distances of points to their nearest center) for this model on the given data. + + ..note:: Deprecated in 2.4.0. It will be removed in 3.0.0. Use ClusteringEvaluator instead. """ + warnings.warn("Deprecated in 2.4.0. It will be removed in 3.0.0. Use ClusteringEvaluator" + " instead.", DeprecationWarning) return self._call_java("computeCost", dataset) @property From ca8c2ece6a82b0ab805dacf3606333f3baa41b32 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 11 Apr 2018 16:08:20 +0200 Subject: [PATCH 2/6] add kmeansCost --- .../ml/evaluation/ClusteringEvaluator.scala | 45 ++++++++++++++++--- .../spark/mllib/clustering/KMeans.scala | 2 +- .../evaluation/ClusteringEvaluatorSuite.scala | 22 +++++++++ 3 files changed, 62 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala index 8d4ae562b3d2..4bc59a7f11eb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala @@ -23,8 +23,9 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} -import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, - SchemaUtils} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} +import org.apache.spark.mllib.clustering.{DistanceMeasure, VectorWithNorm} +import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.sql.{Column, DataFrame, Dataset} import org.apache.spark.sql.functions.{avg, col, udf} import org.apache.spark.sql.types.DoubleType @@ -63,12 +64,12 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str /** * param for metric name in evaluation - * (supports `"silhouette"` (default)) + * (supports `"silhouette"` (default), `"kmeansCost"`) * @group param */ @Since("2.3.0") val metricName: Param[String] = { - val allowedParams = ParamValidators.inArray(Array("silhouette")) + val allowedParams = ParamValidators.inArray(Array("silhouette", "kmeansCost")) new Param( this, "metricName", "metric name in evaluation (silhouette)", allowedParams) } @@ -83,12 +84,12 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str /** * param for distance measure to be used in evaluation - * (supports `"squaredEuclidean"` (default), `"cosine"`) + * (supports `"squaredEuclidean"` (default), `"cosine"`, `"euclidean"`) * @group param */ @Since("2.4.0") val distanceMeasure: Param[String] = { - val availableValues = Array("squaredEuclidean", "cosine") + val availableValues = Array("squaredEuclidean", "cosine", "euclidean") val allowedParams = ParamValidators.inArray(availableValues) new Param(this, "distanceMeasure", "distance measure in evaluation. Supported options: " + availableValues.mkString("'", "', '", "'"), allowedParams) @@ -104,6 +105,23 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str setDefault(metricName -> "silhouette", distanceMeasure -> "squaredEuclidean") + /** + * param for clusterCenters to be used in evaluation of the + * kmeansCost metric + * @group param + */ + @Since("2.4.0") + val clusterCenters: Param[Array[Vector]] = new Param(this, "clusterCenters", + "Cluster centers used in the metric kmeansCost.", ParamValidators.arrayLengthGt[Vector](0)) + + /** @group getParam */ + @Since("2.4.0") + def getClusterCenters: Array[Vector] = $(clusterCenters) + + /** @group setParam */ + @Since("2.4.0") + def setClusterCenters(value: Array[Vector]): this.type = set(clusterCenters, value) + @Since("2.3.0") override def evaluate(dataset: Dataset[_]): Double = { SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) @@ -115,6 +133,21 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str dataset, $(predictionCol), $(featuresCol)) case ("silhouette", "cosine") => CosineSilhouette.computeSilhouetteScore(dataset, $(predictionCol), $(featuresCol)) + case ("silhouette", dm) => + throw new IllegalArgumentException(s"Silhouette does not support $dm distance measure.") + case ("kmeansCost", dm) => + assert(isSet(clusterCenters), "Cluster centers need to be set for evaluating kmeansCost.") + assert(Seq("cosine", "euclidean").contains(dm), + s"kmeansCost does not support $dm distance measure.") + val distanceMeasure = DistanceMeasure.decodeFromString(dm) + val bClusterCenters = dataset.sparkSession.sparkContext.broadcast( + $(clusterCenters).map(p => new VectorWithNorm(OldVectors.fromML(p)))) + val cost = dataset.select($(featuresCol)).rdd.map { row => + distanceMeasure.pointCost(bClusterCenters.value, + new VectorWithNorm(OldVectors.fromML(row.getAs[Vector](0)))) + }.sum() + bClusterCenters.destroy(false) + cost } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index b5b1be349049..5d822f11b9c6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -586,7 +586,7 @@ object KMeans { /** * A vector with its norm for fast distance computation. */ -private[clustering] class VectorWithNorm(val vector: Vector, val norm: Double) +private[spark] class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable { def this(vector: Vector) = this(vector, Vectors.norm(vector, 2.0)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index 3bf34770f568..3786e2b13816 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.ml.evaluation import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.clustering.{KMeans, KMeansSuite} +import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.ml.util.TestingUtils._ @@ -100,4 +102,24 @@ class ClusteringEvaluatorSuite } } + test("SPARK-23451: kmeansCost metric") { + val k = 4 + val dataset = KMeansSuite.generateKMeansData(spark, 40, 3, k) + .filter(_.getAs[Vector](0).numNonzeros > 1) + Seq("euclidean", "cosine").foreach { distanceMeasure => + val kmeans = new KMeans() + .setK(k) + .setSeed(1) + .setDistanceMeasure(distanceMeasure) + val model = kmeans.fit(dataset) + val predicted = model.transform(dataset) + + val evaluator = new ClusteringEvaluator() + .setMetricName("kmeansCost") + .setClusterCenters(model.clusterCenters) + .setDistanceMeasure(distanceMeasure) + + assert(evaluator.evaluate(predicted) ~== model.computeCost(dataset) relTol 1e-5) + } + } } From e39ecd14357923113aa8eb69da8ebede28723f09 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 29 Jun 2018 10:47:44 +0200 Subject: [PATCH 3/6] Revert "add kmeansCost" This reverts commit ca8c2ece6a82b0ab805dacf3606333f3baa41b32. --- .../ml/evaluation/ClusteringEvaluator.scala | 45 +++---------------- .../spark/mllib/clustering/KMeans.scala | 2 +- .../evaluation/ClusteringEvaluatorSuite.scala | 23 ---------- 3 files changed, 7 insertions(+), 63 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala index a7ecd60a3419..4353c46781e9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala @@ -24,9 +24,8 @@ import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} -import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} -import org.apache.spark.mllib.clustering.{DistanceMeasure, VectorWithNorm} -import org.apache.spark.mllib.linalg.{Vectors => OldVectors} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, + SchemaUtils} import org.apache.spark.sql.{Column, DataFrame, Dataset} import org.apache.spark.sql.functions.{avg, col, udf} import org.apache.spark.sql.types.DoubleType @@ -65,12 +64,12 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str /** * param for metric name in evaluation - * (supports `"silhouette"` (default), `"kmeansCost"`) + * (supports `"silhouette"` (default)) * @group param */ @Since("2.3.0") val metricName: Param[String] = { - val allowedParams = ParamValidators.inArray(Array("silhouette", "kmeansCost")) + val allowedParams = ParamValidators.inArray(Array("silhouette")) new Param( this, "metricName", "metric name in evaluation (silhouette)", allowedParams) } @@ -85,12 +84,12 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str /** * param for distance measure to be used in evaluation - * (supports `"squaredEuclidean"` (default), `"cosine"`, `"euclidean"`) + * (supports `"squaredEuclidean"` (default), `"cosine"`) * @group param */ @Since("2.4.0") val distanceMeasure: Param[String] = { - val availableValues = Array("squaredEuclidean", "cosine", "euclidean") + val availableValues = Array("squaredEuclidean", "cosine") val allowedParams = ParamValidators.inArray(availableValues) new Param(this, "distanceMeasure", "distance measure in evaluation. Supported options: " + availableValues.mkString("'", "', '", "'"), allowedParams) @@ -106,23 +105,6 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str setDefault(metricName -> "silhouette", distanceMeasure -> "squaredEuclidean") - /** - * param for clusterCenters to be used in evaluation of the - * kmeansCost metric - * @group param - */ - @Since("2.4.0") - val clusterCenters: Param[Array[Vector]] = new Param(this, "clusterCenters", - "Cluster centers used in the metric kmeansCost.", ParamValidators.arrayLengthGt[Vector](0)) - - /** @group getParam */ - @Since("2.4.0") - def getClusterCenters: Array[Vector] = $(clusterCenters) - - /** @group setParam */ - @Since("2.4.0") - def setClusterCenters(value: Array[Vector]): this.type = set(clusterCenters, value) - @Since("2.3.0") override def evaluate(dataset: Dataset[_]): Double = { SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) @@ -134,21 +116,6 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str dataset, $(predictionCol), $(featuresCol)) case ("silhouette", "cosine") => CosineSilhouette.computeSilhouetteScore(dataset, $(predictionCol), $(featuresCol)) - case ("silhouette", dm) => - throw new IllegalArgumentException(s"Silhouette does not support $dm distance measure.") - case ("kmeansCost", dm) => - assert(isSet(clusterCenters), "Cluster centers need to be set for evaluating kmeansCost.") - assert(Seq("cosine", "euclidean").contains(dm), - s"kmeansCost does not support $dm distance measure.") - val distanceMeasure = DistanceMeasure.decodeFromString(dm) - val bClusterCenters = dataset.sparkSession.sparkContext.broadcast( - $(clusterCenters).map(p => new VectorWithNorm(OldVectors.fromML(p)))) - val cost = dataset.select($(featuresCol)).rdd.map { row => - distanceMeasure.pointCost(bClusterCenters.value, - new VectorWithNorm(OldVectors.fromML(row.getAs[Vector](0)))) - }.sum() - bClusterCenters.destroy(false) - cost } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 5d822f11b9c6..b5b1be349049 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -586,7 +586,7 @@ object KMeans { /** * A vector with its norm for fast distance computation. */ -private[spark] class VectorWithNorm(val vector: Vector, val norm: Double) +private[clustering] class VectorWithNorm(val vector: Vector, val norm: Double) extends Serializable { def this(vector: Vector) = this(vector, Vectors.norm(vector, 2.0)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index 0296550f753b..0992f3db75fd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.ml.evaluation import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.attribute.AttributeGroup -import org.apache.spark.ml.clustering.{KMeans, KMeansSuite} -import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.ml.util.TestingUtils._ @@ -103,27 +101,6 @@ class ClusteringEvaluatorSuite } } - test("SPARK-23451: kmeansCost metric") { - val k = 4 - val dataset = KMeansSuite.generateKMeansData(spark, 40, 3, k) - .filter(_.getAs[Vector](0).numNonzeros > 1) - Seq("euclidean", "cosine").foreach { distanceMeasure => - val kmeans = new KMeans() - .setK(k) - .setSeed(1) - .setDistanceMeasure(distanceMeasure) - val model = kmeans.fit(dataset) - val predicted = model.transform(dataset) - - val evaluator = new ClusteringEvaluator() - .setMetricName("kmeansCost") - .setClusterCenters(model.clusterCenters) - .setDistanceMeasure(distanceMeasure) - - assert(evaluator.evaluate(predicted) ~== model.computeCost(dataset) relTol 1e-5) - } - } - test("SPARK-23568: we should use metadata to determine features number") { val attributesNum = irisDataset.select("features").rdd.first().getAs[Vector](0).size val attrGroup = new AttributeGroup("features", attributesNum) From be954c64658f311622a16f9aa2c9f717ec9e7f21 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 29 Jun 2018 12:18:38 +0200 Subject: [PATCH 4/6] add training cost to KMeansSummary --- .../apache/spark/ml/clustering/KMeans.scala | 14 +++++++++----- .../apache/spark/mllib/clustering/KMeans.scala | 2 +- .../spark/mllib/clustering/KMeansModel.scala | 10 ++++++---- .../spark/ml/clustering/KMeansSuite.scala | 2 ++ .../evaluation/ClusteringEvaluatorSuite.scala | 1 + python/pyspark/ml/clustering.py | 18 +++++++++++++++--- 6 files changed, 34 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 5a49f65c0f67..c8949725c6f4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -142,10 +142,10 @@ class KMeansModel private[ml] ( * model on the given data. * * @deprecated This method is deprecated and will be removed in 3.0.0. Use ClusteringEvaluator - * instead. + * instead. You can also get the cost on the training dataset in the summary. */ - @deprecated("This method is deprecated and will be removed in 3.0.0." + - " Use ClusteringEvaluator instead.", "2.4.0") + @deprecated("This method is deprecated and will be removed in 3.0.0.Use ClusteringEvaluator " + + "instead. You can also get the cost on the training dataset in the summary.", "2.4.0") @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) @@ -336,7 +336,7 @@ class KMeans @Since("1.5.0") ( val parentModel = algo.run(instances, Option(instr)) val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) val summary = new KMeansSummary( - model.transform(dataset), $(predictionCol), $(featuresCol), $(k)) + model.transform(dataset), $(predictionCol), $(featuresCol), $(k), parentModel.trainingCost) model.setSummary(Some(summary)) instr.logSuccess(model) @@ -367,6 +367,8 @@ object KMeans extends DefaultParamsReadable[KMeans] { * @param predictionCol Name for column of predicted clusters in `predictions`. * @param featuresCol Name for column of features in `predictions`. * @param k Number of clusters. + * @param trainingCost K-means cost (sum of squared distances to the nearest centroid for all + * points in the training dataset). This is equivalent to sklearn's inertia. */ @Since("2.0.0") @Experimental @@ -374,4 +376,6 @@ class KMeansSummary private[clustering] ( predictions: DataFrame, predictionCol: String, featuresCol: String, - k: Int) extends ClusteringSummary(predictions, predictionCol, featuresCol, k) + k: Int, + @Since("2.4.0") val trainingCost: Double) + extends ClusteringSummary(predictions, predictionCol, featuresCol, k) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index b5b1be349049..1f294c267ad5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -348,7 +348,7 @@ class KMeans private ( logInfo(s"The cost is $cost.") - new KMeansModel(centers.map(_.vector), distanceMeasure) + new KMeansModel(centers.map(_.vector), distanceMeasure, cost) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index a78c21e838e4..793321c32234 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.{Row, SparkSession} */ @Since("0.8.0") class KMeansModel @Since("2.4.0") (@Since("1.0.0") val clusterCenters: Array[Vector], - @Since("2.4.0") val distanceMeasure: String) + @Since("2.4.0") val distanceMeasure: String, @Since("2.4.0") val trainingCost: Double) extends Saveable with Serializable with PMMLExportable { private val distanceMeasureInstance: DistanceMeasure = @@ -48,7 +48,7 @@ class KMeansModel @Since("2.4.0") (@Since("1.0.0") val clusterCenters: Array[Vec @Since("1.1.0") def this(clusterCenters: Array[Vector]) = - this(clusterCenters: Array[Vector], DistanceMeasure.EUCLIDEAN) + this(clusterCenters: Array[Vector], DistanceMeasure.EUCLIDEAN, 0.0) /** * A Java-friendly constructor that takes an Iterable of Vectors. @@ -182,7 +182,8 @@ object KMeansModel extends Loader[KMeansModel] { val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) - ~ ("k" -> model.k) ~ ("distanceMeasure" -> model.distanceMeasure))) + ~ ("k" -> model.k) ~ ("distanceMeasure" -> model.distanceMeasure) + ~ ("trainingCost" -> model.trainingCost))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex).map { case (p, id) => Cluster(id, p.vector) @@ -202,7 +203,8 @@ object KMeansModel extends Loader[KMeansModel] { val localCentroids = centroids.rdd.map(Cluster.apply).collect() assert(k == localCentroids.length) val distanceMeasure = (metadata \ "distanceMeasure").extract[String] - new KMeansModel(localCentroids.sortBy(_.id).map(_.point), distanceMeasure) + val trainingCost = (metadata \ "trainingCost").extract[Double] + new KMeansModel(localCentroids.sortBy(_.id).map(_.point), distanceMeasure, trainingCost) } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 32830b39407a..383a49238f7e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -126,6 +126,8 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR assert(summary.predictions.columns.contains(c)) } assert(summary.cluster.columns === Array(predictionColName)) + assert(summary.trainingCost < 0.1) + assert(model.computeCost(dataset) == summary.trainingCost) val clusterSizes = summary.clusterSizes assert(clusterSizes.length === k) assert(clusterSizes.sum === numRows) diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index 0992f3db75fd..ab6ab775b20b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.ml.evaluation import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.ml.util.TestingUtils._ diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index fca788a3d043..b033b7151a85 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -303,7 +303,15 @@ class KMeansSummary(ClusteringSummary): .. versionadded:: 2.1.0 """ - pass + + @property + @since("2.4.0") + def trainingCost(self): + """ + K-means cost (sum of squared distances to the nearest centroid for all points in the + training dataset). This is equivalent to sklearn's inertia. + """ + return self._call_java("trainingCost") class KMeansModel(JavaModel, JavaMLWritable, JavaMLReadable): @@ -325,9 +333,11 @@ def computeCost(self, dataset): for this model on the given data. ..note:: Deprecated in 2.4.0. It will be removed in 3.0.0. Use ClusteringEvaluator instead. + You can also get the cost on the training dataset in the summary. """ - warnings.warn("Deprecated in 2.4.0. It will be removed in 3.0.0. Use ClusteringEvaluator" - " instead.", DeprecationWarning) + warnings.warn("Deprecated in 2.4.0. It will be removed in 3.0.0. Use ClusteringEvaluator " + "instead. You can also get the cost on the training dataset in the summary.", + DeprecationWarning) return self._call_java("computeCost", dataset) @property @@ -383,6 +393,8 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol 2 >>> summary.clusterSizes [2, 2] + >>> summary.trainingCost + 2.000... >>> kmeans_path = temp_path + "/kmeans" >>> kmeans.save(kmeans_path) >>> kmeans2 = KMeans.load(kmeans_path) From 64b0fca56abc72f69806bb4166a6ed6e8fd0a715 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 29 Jun 2018 12:53:10 +0200 Subject: [PATCH 5/6] fix mima --- project/MimaExcludes.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a87fa68422c3..559d92d17248 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,8 @@ object MimaExcludes { // Exclude rules for 2.4.x lazy val v24excludes = v23excludes ++ Seq( + // [SPARK-23451][ML] Deprecate KMeans.computeCost + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.KMeansSummary.this"), // [SPARK-22941][core] Do not exit JVM when submit fails with in-process launcher. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.printWarning"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.parseSparkConfProperty"), From 5a5c31fb43b809d2324d27cad1796f8aae8549ce Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 29 Jun 2018 13:01:18 +0200 Subject: [PATCH 6/6] cleanup --- .../main/scala/org/apache/spark/ml/clustering/KMeans.scala | 2 +- .../apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index c8949725c6f4..15707422aef3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -144,7 +144,7 @@ class KMeansModel private[ml] ( * @deprecated This method is deprecated and will be removed in 3.0.0. Use ClusteringEvaluator * instead. You can also get the cost on the training dataset in the summary. */ - @deprecated("This method is deprecated and will be removed in 3.0.0.Use ClusteringEvaluator " + + @deprecated("This method is deprecated and will be removed in 3.0.0. Use ClusteringEvaluator " + "instead. You can also get the cost on the training dataset in the summary.", "2.4.0") @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index ab6ab775b20b..2c175ff68e0b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -62,8 +62,8 @@ class ClusteringEvaluatorSuite */ test("squared euclidean Silhouette") { val evaluator = new ClusteringEvaluator() - .setFeaturesCol("features") - .setPredictionCol("label") + .setFeaturesCol("features") + .setPredictionCol("label") assert(evaluator.evaluate(irisDataset) ~== 0.6564679231 relTol 1e-5) }