diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index 82b8e14f010af..fac4d92b1810c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -98,6 +98,24 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va @Since("2.0.0") override def evaluate(dataset: Dataset[_]): Double = { + val metrics = getMetrics(dataset) + val metric = $(metricName) match { + case "areaUnderROC" => metrics.areaUnderROC() + case "areaUnderPR" => metrics.areaUnderPR() + } + metrics.unpersist() + metric + } + + /** + * Get a BinaryClassificationMetrics, which can be used to get binary classification + * metrics such as areaUnderROC and areaUnderPR. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return BinaryClassificationMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): BinaryClassificationMetrics = { val schema = dataset.schema SchemaUtils.checkColumnTypes(schema, $(rawPredictionCol), Seq(DoubleType, new VectorUDT)) SchemaUtils.checkNumericType(schema, $(labelCol)) @@ -119,13 +137,7 @@ class BinaryClassificationEvaluator @Since("1.4.0") (@Since("1.4.0") override va case Row(rawPrediction: Double, label: Double, weight: Double) => (rawPrediction, label, weight) } - val metrics = new BinaryClassificationMetrics(scoreAndLabelsWithWeights, $(numBins)) - val metric = $(metricName) match { - case "areaUnderROC" => metrics.areaUnderROC() - case "areaUnderPR" => metrics.areaUnderPR() - } - metrics.unpersist() - metric + new BinaryClassificationMetrics(scoreAndLabelsWithWeights, $(numBins)) } @Since("1.5.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala index 641a1eb5f61db..63b99a0de4b65 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala @@ -17,16 +17,12 @@ package org.apache.spark.ml.evaluation -import org.apache.spark.SparkContext import org.apache.spark.annotation.Since -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors} import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} import org.apache.spark.ml.util._ -import org.apache.spark.sql.{Column, DataFrame, Dataset} -import org.apache.spark.sql.functions.{avg, col, udf} -import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.functions.col /** * Evaluator for clustering results. @@ -102,22 +98,33 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str @Since("2.3.0") override def evaluate(dataset: Dataset[_]): Double = { + val metrics = getMetrics(dataset) + + $(metricName) match { + case ("silhouette") => metrics.silhouette + case (other) => + throw new IllegalArgumentException(s"No support for metric $other") + } + } + + /** + * Get a ClusteringMetrics, which can be used to get clustering metrics such as + * silhouette score. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return ClusteringMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): ClusteringMetrics = { SchemaUtils.validateVectorCompatibleColumn(dataset.schema, $(featuresCol)) SchemaUtils.checkNumericType(dataset.schema, $(predictionCol)) val vectorCol = DatasetUtils.columnToVector(dataset, $(featuresCol)) val df = dataset.select(col($(predictionCol)), vectorCol.as($(featuresCol), dataset.schema($(featuresCol)).metadata)) - - ($(metricName), $(distanceMeasure)) match { - case ("silhouette", "squaredEuclidean") => - SquaredEuclideanSilhouette.computeSilhouetteScore( - df, $(predictionCol), $(featuresCol)) - case ("silhouette", "cosine") => - CosineSilhouette.computeSilhouetteScore(df, $(predictionCol), $(featuresCol)) - case (mn, dm) => - throw new IllegalArgumentException(s"No support for metric $mn, distance $dm") - } + val metrics = new ClusteringMetrics(df) + metrics.setDistanceMeasure($(distanceMeasure)) + metrics } @Since("3.0.0") @@ -136,523 +143,3 @@ object ClusteringEvaluator override def load(path: String): ClusteringEvaluator = super.load(path) } - - -private[evaluation] abstract class Silhouette { - - /** - * It computes the Silhouette coefficient for a point. - */ - def pointSilhouetteCoefficient( - clusterIds: Set[Double], - pointClusterId: Double, - pointClusterNumOfPoints: Long, - averageDistanceToCluster: (Double) => Double): Double = { - if (pointClusterNumOfPoints == 1) { - // Single-element clusters have silhouette 0 - 0.0 - } else { - // Here we compute the average dissimilarity of the current point to any cluster of which the - // point is not a member. - // The cluster with the lowest average dissimilarity - i.e. the nearest cluster to the current - // point - is said to be the "neighboring cluster". - val otherClusterIds = clusterIds.filter(_ != pointClusterId) - val neighboringClusterDissimilarity = otherClusterIds.map(averageDistanceToCluster).min - // adjustment for excluding the node itself from the computation of the average dissimilarity - val currentClusterDissimilarity = - averageDistanceToCluster(pointClusterId) * pointClusterNumOfPoints / - (pointClusterNumOfPoints - 1) - if (currentClusterDissimilarity < neighboringClusterDissimilarity) { - 1 - (currentClusterDissimilarity / neighboringClusterDissimilarity) - } else if (currentClusterDissimilarity > neighboringClusterDissimilarity) { - (neighboringClusterDissimilarity / currentClusterDissimilarity) - 1 - } else { - 0.0 - } - } - } - - /** - * Compute the mean Silhouette values of all samples. - */ - def overallScore(df: DataFrame, scoreColumn: Column): Double = { - df.select(avg(scoreColumn)).collect()(0).getDouble(0) - } -} - -/** - * SquaredEuclideanSilhouette computes the average of the - * Silhouette over all the data of the dataset, which is - * a measure of how appropriately the data have been clustered. - * - * The Silhouette for each point `i` is defined as: - * - *
- * $$ - * s_{i} = \frac{b_{i}-a_{i}}{max\{a_{i},b_{i}\}} - * $$ - *
- * - * which can be rewritten as - * - *
- * $$ - * s_{i}= \begin{cases} - * 1-\frac{a_{i}}{b_{i}} & \text{if } a_{i} \leq b_{i} \\ - * \frac{b_{i}}{a_{i}}-1 & \text{if } a_{i} \gt b_{i} \end{cases} - * $$ - *
- * - * where `$a_{i}$` is the average dissimilarity of `i` with all other data - * within the same cluster, `$b_{i}$` is the lowest average dissimilarity - * of `i` to any other cluster, of which `i` is not a member. - * `$a_{i}$` can be interpreted as how well `i` is assigned to its cluster - * (the smaller the value, the better the assignment), while `$b_{i}$` is - * a measure of how well `i` has not been assigned to its "neighboring cluster", - * ie. the nearest cluster to `i`. - * - * Unfortunately, the naive implementation of the algorithm requires to compute - * the distance of each couple of points in the dataset. Since the computation of - * the distance measure takes `D` operations - if `D` is the number of dimensions - * of each point, the computational complexity of the algorithm is `O(N^2^*D)`, where - * `N` is the cardinality of the dataset. Of course this is not scalable in `N`, - * which is the critical number in a Big Data context. - * - * The algorithm which is implemented in this object, instead, is an efficient - * and parallel implementation of the Silhouette using the squared Euclidean - * distance measure. - * - * With this assumption, the total distance of the point `X` - * to the points `$C_{i}$` belonging to the cluster `$\Gamma$` is: - * - *
- * $$ - * \sum\limits_{i=1}^N d(X, C_{i} ) = - * \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D (x_{j}-c_{ij})^2 \Big) - * = \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D x_{j}^2 + - * \sum\limits_{j=1}^D c_{ij}^2 -2\sum\limits_{j=1}^D x_{j}c_{ij} \Big) - * = \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 + - * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 - * -2 \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} - * $$ - *
- * - * where `$x_{j}$` is the `j`-th dimension of the point `X` and - * `$c_{ij}$` is the `j`-th dimension of the `i`-th point in cluster `$\Gamma$`. - * - * Then, the first term of the equation can be rewritten as: - * - *
- * $$ - * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 = N \xi_{X} \text{ , - * with } \xi_{X} = \sum\limits_{j=1}^D x_{j}^2 - * $$ - *
- * - * where `$\xi_{X}$` is fixed for each point and it can be precomputed. - * - * Moreover, the second term is fixed for each cluster too, - * thus we can name it `$\Psi_{\Gamma}$` - * - *
- * $$ - * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 = - * \sum\limits_{i=1}^N \xi_{C_{i}} = \Psi_{\Gamma} - * $$ - *
- * - * Last, the third element becomes - * - *
- * $$ - * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} = - * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} - * $$ - *
- * - * thus defining the vector - * - *
- * $$ - * Y_{\Gamma}:Y_{\Gamma j} = \sum\limits_{i=1}^N c_{ij} , j=0, ..., D - * $$ - *
- * - * which is fixed for each cluster `$\Gamma$`, we have - * - *
- * $$ - * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} = - * \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} - * $$ - *
- * - * In this way, the previous equation becomes - * - *
- * $$ - * N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} - * $$ - *
- * - * and the average distance of a point to a cluster can be computed as - * - *
- * $$ - * \frac{\sum\limits_{i=1}^N d(X, C_{i} )}{N} = - * \frac{N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} = - * \xi_{X} + \frac{\Psi_{\Gamma} }{N} - 2 \frac{\sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} - * $$ - *
- * - * Thus, it is enough to precompute: the constant `$\xi_{X}$` for each point `X`; the - * constants `$\Psi_{\Gamma}$`, `N` and the vector `$Y_{\Gamma}$` for - * each cluster `$\Gamma$`. - * - * In the implementation, the precomputed values for the clusters - * are distributed among the worker nodes via broadcasted variables, - * because we can assume that the clusters are limited in number and - * anyway they are much fewer than the points. - * - * The main strengths of this algorithm are the low computational complexity - * and the intrinsic parallelism. The precomputed information for each point - * and for each cluster can be computed with a computational complexity - * which is `O(N/W)`, where `N` is the number of points in the dataset and - * `W` is the number of worker nodes. After that, every point can be - * analyzed independently of the others. - * - * For every point we need to compute the average distance to all the clusters. - * Since the formula above requires `O(D)` operations, this phase has a - * computational complexity which is `O(C*D*N/W)` where `C` is the number of - * clusters (which we assume quite low), `D` is the number of dimensions, - * `N` is the number of points in the dataset and `W` is the number - * of worker nodes. - */ -private[evaluation] object SquaredEuclideanSilhouette extends Silhouette { - - private[this] var kryoRegistrationPerformed: Boolean = false - - /** - * This method registers the class - * [[org.apache.spark.ml.evaluation.SquaredEuclideanSilhouette.ClusterStats]] - * for kryo serialization. - * - * @param sc `SparkContext` to be used - */ - def registerKryoClasses(sc: SparkContext): Unit = { - if (!kryoRegistrationPerformed) { - sc.getConf.registerKryoClasses( - Array( - classOf[SquaredEuclideanSilhouette.ClusterStats] - ) - ) - kryoRegistrationPerformed = true - } - } - - case class ClusterStats(featureSum: Vector, squaredNormSum: Double, numOfPoints: Long) - - /** - * The method takes the input dataset and computes the aggregated values - * about a cluster which are needed by the algorithm. - * - * @param df The DataFrame which contains the input data - * @param predictionCol The name of the column which contains the predicted cluster id - * for the point. - * @param featuresCol The name of the column which contains the feature vector of the point. - * @return A [[scala.collection.immutable.Map]] which associates each cluster id - * to a [[ClusterStats]] object (which contains the precomputed values `N`, - * `$\Psi_{\Gamma}$` and `$Y_{\Gamma}$` for a cluster). - */ - def computeClusterStats( - df: DataFrame, - predictionCol: String, - featuresCol: String): Map[Double, ClusterStats] = { - val numFeatures = MetadataUtils.getNumFeatures(df, featuresCol) - val clustersStatsRDD = df.select( - col(predictionCol).cast(DoubleType), col(featuresCol), col("squaredNorm")) - .rdd - .map { row => (row.getDouble(0), (row.getAs[Vector](1), row.getDouble(2))) } - .aggregateByKey[(DenseVector, Double, Long)]((Vectors.zeros(numFeatures).toDense, 0.0, 0L))( - seqOp = { - case ( - (featureSum: DenseVector, squaredNormSum: Double, numOfPoints: Long), - (features, squaredNorm) - ) => - BLAS.axpy(1.0, features, featureSum) - (featureSum, squaredNormSum + squaredNorm, numOfPoints + 1) - }, - combOp = { - case ( - (featureSum1, squaredNormSum1, numOfPoints1), - (featureSum2, squaredNormSum2, numOfPoints2) - ) => - BLAS.axpy(1.0, featureSum2, featureSum1) - (featureSum1, squaredNormSum1 + squaredNormSum2, numOfPoints1 + numOfPoints2) - } - ) - - clustersStatsRDD - .collectAsMap() - .mapValues { - case (featureSum: DenseVector, squaredNormSum: Double, numOfPoints: Long) => - SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum, numOfPoints) - } - .toMap - } - - /** - * It computes the Silhouette coefficient for a point. - * - * @param broadcastedClustersMap A map of the precomputed values for each cluster. - * @param point The [[org.apache.spark.ml.linalg.Vector]] representing the current point. - * @param clusterId The id of the cluster the current point belongs to. - * @param squaredNorm The `$\Xi_{X}$` (which is the squared norm) precomputed for the point. - * @return The Silhouette for the point. - */ - def computeSilhouetteCoefficient( - broadcastedClustersMap: Broadcast[Map[Double, ClusterStats]], - point: Vector, - clusterId: Double, - squaredNorm: Double): Double = { - - def compute(targetClusterId: Double): Double = { - val clusterStats = broadcastedClustersMap.value(targetClusterId) - val pointDotClusterFeaturesSum = BLAS.dot(point, clusterStats.featureSum) - - squaredNorm + - clusterStats.squaredNormSum / clusterStats.numOfPoints - - 2 * pointDotClusterFeaturesSum / clusterStats.numOfPoints - } - - pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet, - clusterId, - broadcastedClustersMap.value(clusterId).numOfPoints, - compute) - } - - /** - * Compute the Silhouette score of the dataset using squared Euclidean distance measure. - * - * @param dataset The input dataset (previously clustered) on which compute the Silhouette. - * @param predictionCol The name of the column which contains the predicted cluster id - * for the point. - * @param featuresCol The name of the column which contains the feature vector of the point. - * @return The average of the Silhouette values of the clustered data. - */ - def computeSilhouetteScore( - dataset: Dataset[_], - predictionCol: String, - featuresCol: String): Double = { - SquaredEuclideanSilhouette.registerKryoClasses(dataset.sparkSession.sparkContext) - - val squaredNormUDF = udf { - features: Vector => math.pow(Vectors.norm(features, 2.0), 2.0) - } - val dfWithSquaredNorm = dataset.withColumn("squaredNorm", squaredNormUDF(col(featuresCol))) - - // compute aggregate values for clusters needed by the algorithm - val clustersStatsMap = SquaredEuclideanSilhouette - .computeClusterStats(dfWithSquaredNorm, predictionCol, featuresCol) - - // Silhouette is reasonable only when the number of clusters is greater then 1 - assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.") - - val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap) - - val computeSilhouetteCoefficientUDF = udf { - computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: Double) - } - - val silhouetteScore = overallScore(dfWithSquaredNorm, - computeSilhouetteCoefficientUDF(col(featuresCol), col(predictionCol).cast(DoubleType), - col("squaredNorm"))) - - bClustersStatsMap.destroy() - - silhouetteScore - } -} - - -/** - * The algorithm which is implemented in this object, instead, is an efficient and parallel - * implementation of the Silhouette using the cosine distance measure. The cosine distance - * measure is defined as `1 - s` where `s` is the cosine similarity between two points. - * - * The total distance of the point `X` to the points `$C_{i}$` belonging to the cluster `$\Gamma$` - * is: - * - *
- * $$ - * \sum\limits_{i=1}^N d(X, C_{i} ) = - * \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} }{ \|X\|\|C_{i}\|} \Big) - * = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} - * \frac{c_{ij}}{\|C_{i}\|} - * = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( \sum\limits_{i=1}^N - * \frac{c_{ij}}{\|C_{i}\|} \Big) - * $$ - *
- * - * where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$` is the `j`-th dimension - * of the `i`-th point in cluster `$\Gamma$`. - * - * Then, we can define the vector: - * - *
- * $$ - * \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D - * $$ - *
- * - * which can be precomputed for each point and the vector - * - *
- * $$ - * \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N \xi_{C_{j}i}, i = 1, ..., D - * $$ - *
- * - * which can be precomputed too for each cluster `$\Gamma$` by its points `$C_{i}$`. - * - * With these definitions, the numerator becomes: - * - *
- * $$ - * N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j} - * $$ - *
- * - * Thus the average distance of a point `X` to the points of the cluster `$\Gamma$` is: - * - *
- * $$ - * 1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N} - * $$ - *
- * - * In the implementation, the precomputed values for the clusters are distributed among the worker - * nodes via broadcasted variables, because we can assume that the clusters are limited in number. - * - * The main strengths of this algorithm are the low computational complexity and the intrinsic - * parallelism. The precomputed information for each point and for each cluster can be computed - * with a computational complexity which is `O(N/W)`, where `N` is the number of points in the - * dataset and `W` is the number of worker nodes. After that, every point can be analyzed - * independently from the others. - * - * For every point we need to compute the average distance to all the clusters. Since the formula - * above requires `O(D)` operations, this phase has a computational complexity which is - * `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite low), `D` is the number - * of dimensions, `N` is the number of points in the dataset and `W` is the number of worker - * nodes. - */ -private[evaluation] object CosineSilhouette extends Silhouette { - - private[this] val normalizedFeaturesColName = "normalizedFeatures" - - /** - * The method takes the input dataset and computes the aggregated values - * about a cluster which are needed by the algorithm. - * - * @param df The DataFrame which contains the input data - * @param predictionCol The name of the column which contains the predicted cluster id - * for the point. - * @return A [[scala.collection.immutable.Map]] which associates each cluster id to a - * its statistics (ie. the precomputed values `N` and `$\Omega_{\Gamma}$`). - */ - def computeClusterStats( - df: DataFrame, - featuresCol: String, - predictionCol: String): Map[Double, (Vector, Long)] = { - val numFeatures = MetadataUtils.getNumFeatures(df, featuresCol) - val clustersStatsRDD = df.select( - col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName)) - .rdd - .map { row => (row.getDouble(0), row.getAs[Vector](1)) } - .aggregateByKey[(DenseVector, Long)]((Vectors.zeros(numFeatures).toDense, 0L))( - seqOp = { - case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long), (normalizedFeatures)) => - BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum) - (normalizedFeaturesSum, numOfPoints + 1) - }, - combOp = { - case ((normalizedFeaturesSum1, numOfPoints1), (normalizedFeaturesSum2, numOfPoints2)) => - BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1) - (normalizedFeaturesSum1, numOfPoints1 + numOfPoints2) - } - ) - - clustersStatsRDD - .collectAsMap() - .toMap - } - - /** - * It computes the Silhouette coefficient for a point. - * - * @param broadcastedClustersMap A map of the precomputed values for each cluster. - * @param normalizedFeatures The [[org.apache.spark.ml.linalg.Vector]] representing the - * normalized features of the current point. - * @param clusterId The id of the cluster the current point belongs to. - */ - def computeSilhouetteCoefficient( - broadcastedClustersMap: Broadcast[Map[Double, (Vector, Long)]], - normalizedFeatures: Vector, - clusterId: Double): Double = { - - def compute(targetClusterId: Double): Double = { - val (normalizedFeatureSum, numOfPoints) = broadcastedClustersMap.value(targetClusterId) - 1 - BLAS.dot(normalizedFeatures, normalizedFeatureSum) / numOfPoints - } - - pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet, - clusterId, - broadcastedClustersMap.value(clusterId)._2, - compute) - } - - /** - * Compute the Silhouette score of the dataset using the cosine distance measure. - * - * @param dataset The input dataset (previously clustered) on which compute the Silhouette. - * @param predictionCol The name of the column which contains the predicted cluster id - * for the point. - * @param featuresCol The name of the column which contains the feature vector of the point. - * @return The average of the Silhouette values of the clustered data. - */ - def computeSilhouetteScore( - dataset: Dataset[_], - predictionCol: String, - featuresCol: String): Double = { - val normalizeFeatureUDF = udf { - features: Vector => { - val norm = Vectors.norm(features, 2.0) - BLAS.scal(1.0 / norm, features) - features - } - } - val dfWithNormalizedFeatures = dataset.withColumn(normalizedFeaturesColName, - normalizeFeatureUDF(col(featuresCol))) - - // compute aggregate values for clusters needed by the algorithm - val clustersStatsMap = computeClusterStats(dfWithNormalizedFeatures, featuresCol, - predictionCol) - - // Silhouette is reasonable only when the number of clusters is greater then 1 - assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.") - - val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap) - - val computeSilhouetteCoefficientUDF = udf { - computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double) - } - - val silhouetteScore = overallScore(dfWithNormalizedFeatures, - computeSilhouetteCoefficientUDF(col(normalizedFeaturesColName), - col(predictionCol).cast(DoubleType))) - - bClustersStatsMap.destroy() - - silhouetteScore - } -} diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala new file mode 100644 index 0000000000000..30970337d7d3b --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.evaluation + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.Since +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors} +import org.apache.spark.ml.util.MetadataUtils +import org.apache.spark.sql.{Column, DataFrame, Dataset} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.DoubleType + + +/** + * Metrics for clustering, which expects two input columns: prediction and label. + */ +@Since("3.1.0") +class ClusteringMetrics private[spark](dataset: Dataset[_]) { + + private var distanceMeasure: String = "squaredEuclidean" + + def getDistanceMeasure: String = distanceMeasure + + def setDistanceMeasure(value: String) : Unit = distanceMeasure = value + + /** + * Returns the silhouette score + */ + @Since("3.1.0") + lazy val silhouette: Double = { + val columns = dataset.columns.toSeq + if (distanceMeasure.equalsIgnoreCase("squaredEuclidean")) { + SquaredEuclideanSilhouette.computeSilhouetteScore( + dataset, columns(0), columns(1)) + } else { + CosineSilhouette.computeSilhouetteScore(dataset, columns(0), columns(1)) + } + } +} + + +private[evaluation] abstract class Silhouette { + + /** + * It computes the Silhouette coefficient for a point. + */ + def pointSilhouetteCoefficient( + clusterIds: Set[Double], + pointClusterId: Double, + pointClusterNumOfPoints: Long, + averageDistanceToCluster: (Double) => Double): Double = { + if (pointClusterNumOfPoints == 1) { + // Single-element clusters have silhouette 0 + 0.0 + } else { + // Here we compute the average dissimilarity of the current point to any cluster of which the + // point is not a member. + // The cluster with the lowest average dissimilarity - i.e. the nearest cluster to the current + // point - is said to be the "neighboring cluster". + val otherClusterIds = clusterIds.filter(_ != pointClusterId) + val neighboringClusterDissimilarity = otherClusterIds.map(averageDistanceToCluster).min + // adjustment for excluding the node itself from the computation of the average dissimilarity + val currentClusterDissimilarity = + averageDistanceToCluster(pointClusterId) * pointClusterNumOfPoints / + (pointClusterNumOfPoints - 1) + if (currentClusterDissimilarity < neighboringClusterDissimilarity) { + 1 - (currentClusterDissimilarity / neighboringClusterDissimilarity) + } else if (currentClusterDissimilarity > neighboringClusterDissimilarity) { + (neighboringClusterDissimilarity / currentClusterDissimilarity) - 1 + } else { + 0.0 + } + } + } + + /** + * Compute the mean Silhouette values of all samples. + */ + def overallScore(df: DataFrame, scoreColumn: Column): Double = { + df.select(avg(scoreColumn)).collect()(0).getDouble(0) + } +} + +/** + * SquaredEuclideanSilhouette computes the average of the + * Silhouette over all the data of the dataset, which is + * a measure of how appropriately the data have been clustered. + * + * The Silhouette for each point `i` is defined as: + * + *
+ * $$ + * s_{i} = \frac{b_{i}-a_{i}}{max\{a_{i},b_{i}\}} + * $$ + *
+ * + * which can be rewritten as + * + *
+ * $$ + * s_{i}= \begin{cases} + * 1-\frac{a_{i}}{b_{i}} & \text{if } a_{i} \leq b_{i} \\ + * \frac{b_{i}}{a_{i}}-1 & \text{if } a_{i} \gt b_{i} \end{cases} + * $$ + *
+ * + * where `$a_{i}$` is the average dissimilarity of `i` with all other data + * within the same cluster, `$b_{i}$` is the lowest average dissimilarity + * of `i` to any other cluster, of which `i` is not a member. + * `$a_{i}$` can be interpreted as how well `i` is assigned to its cluster + * (the smaller the value, the better the assignment), while `$b_{i}$` is + * a measure of how well `i` has not been assigned to its "neighboring cluster", + * ie. the nearest cluster to `i`. + * + * Unfortunately, the naive implementation of the algorithm requires to compute + * the distance of each couple of points in the dataset. Since the computation of + * the distance measure takes `D` operations - if `D` is the number of dimensions + * of each point, the computational complexity of the algorithm is `O(N^2^*D)`, where + * `N` is the cardinality of the dataset. Of course this is not scalable in `N`, + * which is the critical number in a Big Data context. + * + * The algorithm which is implemented in this object, instead, is an efficient + * and parallel implementation of the Silhouette using the squared Euclidean + * distance measure. + * + * With this assumption, the total distance of the point `X` + * to the points `$C_{i}$` belonging to the cluster `$\Gamma$` is: + * + *
+ * $$ + * \sum\limits_{i=1}^N d(X, C_{i} ) = + * \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D (x_{j}-c_{ij})^2 \Big) + * = \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D x_{j}^2 + + * \sum\limits_{j=1}^D c_{ij}^2 -2\sum\limits_{j=1}^D x_{j}c_{ij} \Big) + * = \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 + + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 + * -2 \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} + * $$ + *
+ * + * where `$x_{j}$` is the `j`-th dimension of the point `X` and + * `$c_{ij}$` is the `j`-th dimension of the `i`-th point in cluster `$\Gamma$`. + * + * Then, the first term of the equation can be rewritten as: + * + *
+ * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 = N \xi_{X} \text{ , + * with } \xi_{X} = \sum\limits_{j=1}^D x_{j}^2 + * $$ + *
+ * + * where `$\xi_{X}$` is fixed for each point and it can be precomputed. + * + * Moreover, the second term is fixed for each cluster too, + * thus we can name it `$\Psi_{\Gamma}$` + * + *
+ * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 = + * \sum\limits_{i=1}^N \xi_{C_{i}} = \Psi_{\Gamma} + * $$ + *
+ * + * Last, the third element becomes + * + *
+ * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} = + * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} + * $$ + *
+ * + * thus defining the vector + * + *
+ * $$ + * Y_{\Gamma}:Y_{\Gamma j} = \sum\limits_{i=1}^N c_{ij} , j=0, ..., D + * $$ + *
+ * + * which is fixed for each cluster `$\Gamma$`, we have + * + *
+ * $$ + * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} = + * \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} + * $$ + *
+ * + * In this way, the previous equation becomes + * + *
+ * $$ + * N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} + * $$ + *
+ * + * and the average distance of a point to a cluster can be computed as + * + *
+ * $$ + * \frac{\sum\limits_{i=1}^N d(X, C_{i} )}{N} = + * \frac{N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} = + * \xi_{X} + \frac{\Psi_{\Gamma} }{N} - 2 \frac{\sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} + * $$ + *
+ * + * Thus, it is enough to precompute: the constant `$\xi_{X}$` for each point `X`; the + * constants `$\Psi_{\Gamma}$`, `N` and the vector `$Y_{\Gamma}$` for + * each cluster `$\Gamma$`. + * + * In the implementation, the precomputed values for the clusters + * are distributed among the worker nodes via broadcasted variables, + * because we can assume that the clusters are limited in number and + * anyway they are much fewer than the points. + * + * The main strengths of this algorithm are the low computational complexity + * and the intrinsic parallelism. The precomputed information for each point + * and for each cluster can be computed with a computational complexity + * which is `O(N/W)`, where `N` is the number of points in the dataset and + * `W` is the number of worker nodes. After that, every point can be + * analyzed independently of the others. + * + * For every point we need to compute the average distance to all the clusters. + * Since the formula above requires `O(D)` operations, this phase has a + * computational complexity which is `O(C*D*N/W)` where `C` is the number of + * clusters (which we assume quite low), `D` is the number of dimensions, + * `N` is the number of points in the dataset and `W` is the number + * of worker nodes. + */ +private[evaluation] object SquaredEuclideanSilhouette extends Silhouette { + + private[this] var kryoRegistrationPerformed: Boolean = false + + /** + * This method registers the class + * [[org.apache.spark.ml.evaluation.SquaredEuclideanSilhouette.ClusterStats]] + * for kryo serialization. + * + * @param sc `SparkContext` to be used + */ + def registerKryoClasses(sc: SparkContext): Unit = { + if (!kryoRegistrationPerformed) { + sc.getConf.registerKryoClasses( + Array( + classOf[SquaredEuclideanSilhouette.ClusterStats] + ) + ) + kryoRegistrationPerformed = true + } + } + + case class ClusterStats(featureSum: Vector, squaredNormSum: Double, numOfPoints: Long) + + /** + * The method takes the input dataset and computes the aggregated values + * about a cluster which are needed by the algorithm. + * + * @param df The DataFrame which contains the input data + * @param predictionCol The name of the column which contains the predicted cluster id + * for the point. + * @param featuresCol The name of the column which contains the feature vector of the point. + * @return A [[scala.collection.immutable.Map]] which associates each cluster id + * to a [[ClusterStats]] object (which contains the precomputed values `N`, + * `$\Psi_{\Gamma}$` and `$Y_{\Gamma}$` for a cluster). + */ + def computeClusterStats( + df: DataFrame, + predictionCol: String, + featuresCol: String): Map[Double, ClusterStats] = { + val numFeatures = MetadataUtils.getNumFeatures(df, featuresCol) + val clustersStatsRDD = df.select( + col(predictionCol).cast(DoubleType), col(featuresCol), col("squaredNorm")) + .rdd + .map { row => (row.getDouble(0), (row.getAs[Vector](1), row.getDouble(2))) } + .aggregateByKey[(DenseVector, Double, Long)]((Vectors.zeros(numFeatures).toDense, 0.0, 0L))( + seqOp = { + case ( + (featureSum: DenseVector, squaredNormSum: Double, numOfPoints: Long), + (features, squaredNorm) + ) => + BLAS.axpy(1.0, features, featureSum) + (featureSum, squaredNormSum + squaredNorm, numOfPoints + 1) + }, + combOp = { + case ( + (featureSum1, squaredNormSum1, numOfPoints1), + (featureSum2, squaredNormSum2, numOfPoints2) + ) => + BLAS.axpy(1.0, featureSum2, featureSum1) + (featureSum1, squaredNormSum1 + squaredNormSum2, numOfPoints1 + numOfPoints2) + } + ) + + clustersStatsRDD + .collectAsMap() + .mapValues { + case (featureSum: DenseVector, squaredNormSum: Double, numOfPoints: Long) => + SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum, numOfPoints) + } + .toMap + } + + /** + * It computes the Silhouette coefficient for a point. + * + * @param broadcastedClustersMap A map of the precomputed values for each cluster. + * @param point The [[org.apache.spark.ml.linalg.Vector]] representing the current point. + * @param clusterId The id of the cluster the current point belongs to. + * @param squaredNorm The `$\Xi_{X}$` (which is the squared norm) precomputed for the point. + * @return The Silhouette for the point. + */ + def computeSilhouetteCoefficient( + broadcastedClustersMap: Broadcast[Map[Double, ClusterStats]], + point: Vector, + clusterId: Double, + squaredNorm: Double): Double = { + + def compute(targetClusterId: Double): Double = { + val clusterStats = broadcastedClustersMap.value(targetClusterId) + val pointDotClusterFeaturesSum = BLAS.dot(point, clusterStats.featureSum) + + squaredNorm + + clusterStats.squaredNormSum / clusterStats.numOfPoints - + 2 * pointDotClusterFeaturesSum / clusterStats.numOfPoints + } + + pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet, + clusterId, + broadcastedClustersMap.value(clusterId).numOfPoints, + compute) + } + + /** + * Compute the Silhouette score of the dataset using squared Euclidean distance measure. + * + * @param dataset The input dataset (previously clustered) on which compute the Silhouette. + * @param predictionCol The name of the column which contains the predicted cluster id + * for the point. + * @param featuresCol The name of the column which contains the feature vector of the point. + * @return The average of the Silhouette values of the clustered data. + */ + def computeSilhouetteScore( + dataset: Dataset[_], + predictionCol: String, + featuresCol: String): Double = { + SquaredEuclideanSilhouette.registerKryoClasses(dataset.sparkSession.sparkContext) + + val squaredNormUDF = udf { + features: Vector => math.pow(Vectors.norm(features, 2.0), 2.0) + } + val dfWithSquaredNorm = dataset.withColumn("squaredNorm", squaredNormUDF(col(featuresCol))) + + // compute aggregate values for clusters needed by the algorithm + val clustersStatsMap = SquaredEuclideanSilhouette + .computeClusterStats(dfWithSquaredNorm, predictionCol, featuresCol) + + // Silhouette is reasonable only when the number of clusters is greater then 1 + assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.") + + val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap) + + val computeSilhouetteCoefficientUDF = udf { + computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: Double) + } + + val silhouetteScore = overallScore(dfWithSquaredNorm, + computeSilhouetteCoefficientUDF(col(featuresCol), col(predictionCol).cast(DoubleType), + col("squaredNorm"))) + + bClustersStatsMap.destroy() + + silhouetteScore + } +} + + +/** + * The algorithm which is implemented in this object, instead, is an efficient and parallel + * implementation of the Silhouette using the cosine distance measure. The cosine distance + * measure is defined as `1 - s` where `s` is the cosine similarity between two points. + * + * The total distance of the point `X` to the points `$C_{i}$` belonging to the cluster `$\Gamma$` + * is: + * + *
+ * $$ + * \sum\limits_{i=1}^N d(X, C_{i} ) = + * \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} }{ \|X\|\|C_{i}\|} \Big) + * = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} + * \frac{c_{ij}}{\|C_{i}\|} + * = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( \sum\limits_{i=1}^N + * \frac{c_{ij}}{\|C_{i}\|} \Big) + * $$ + *
+ * + * where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$` is the `j`-th dimension + * of the `i`-th point in cluster `$\Gamma$`. + * + * Then, we can define the vector: + * + *
+ * $$ + * \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D + * $$ + *
+ * + * which can be precomputed for each point and the vector + * + *
+ * $$ + * \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N \xi_{C_{j}i}, i = 1, ..., D + * $$ + *
+ * + * which can be precomputed too for each cluster `$\Gamma$` by its points `$C_{i}$`. + * + * With these definitions, the numerator becomes: + * + *
+ * $$ + * N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j} + * $$ + *
+ * + * Thus the average distance of a point `X` to the points of the cluster `$\Gamma$` is: + * + *
+ * $$ + * 1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N} + * $$ + *
+ * + * In the implementation, the precomputed values for the clusters are distributed among the worker + * nodes via broadcasted variables, because we can assume that the clusters are limited in number. + * + * The main strengths of this algorithm are the low computational complexity and the intrinsic + * parallelism. The precomputed information for each point and for each cluster can be computed + * with a computational complexity which is `O(N/W)`, where `N` is the number of points in the + * dataset and `W` is the number of worker nodes. After that, every point can be analyzed + * independently from the others. + * + * For every point we need to compute the average distance to all the clusters. Since the formula + * above requires `O(D)` operations, this phase has a computational complexity which is + * `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite low), `D` is the number + * of dimensions, `N` is the number of points in the dataset and `W` is the number of worker + * nodes. + */ +private[evaluation] object CosineSilhouette extends Silhouette { + + private[this] val normalizedFeaturesColName = "normalizedFeatures" + + /** + * The method takes the input dataset and computes the aggregated values + * about a cluster which are needed by the algorithm. + * + * @param df The DataFrame which contains the input data + * @param predictionCol The name of the column which contains the predicted cluster id + * for the point. + * @return A [[scala.collection.immutable.Map]] which associates each cluster id to a + * its statistics (ie. the precomputed values `N` and `$\Omega_{\Gamma}$`). + */ + def computeClusterStats( + df: DataFrame, + featuresCol: String, + predictionCol: String): Map[Double, (Vector, Long)] = { + val numFeatures = MetadataUtils.getNumFeatures(df, featuresCol) + val clustersStatsRDD = df.select( + col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName)) + .rdd + .map { row => (row.getDouble(0), row.getAs[Vector](1)) } + .aggregateByKey[(DenseVector, Long)]((Vectors.zeros(numFeatures).toDense, 0L))( + seqOp = { + case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long), (normalizedFeatures)) => + BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum) + (normalizedFeaturesSum, numOfPoints + 1) + }, + combOp = { + case ((normalizedFeaturesSum1, numOfPoints1), (normalizedFeaturesSum2, numOfPoints2)) => + BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1) + (normalizedFeaturesSum1, numOfPoints1 + numOfPoints2) + } + ) + + clustersStatsRDD + .collectAsMap() + .toMap + } + + /** + * It computes the Silhouette coefficient for a point. + * + * @param broadcastedClustersMap A map of the precomputed values for each cluster. + * @param normalizedFeatures The [[org.apache.spark.ml.linalg.Vector]] representing the + * normalized features of the current point. + * @param clusterId The id of the cluster the current point belongs to. + */ + def computeSilhouetteCoefficient( + broadcastedClustersMap: Broadcast[Map[Double, (Vector, Long)]], + normalizedFeatures: Vector, + clusterId: Double): Double = { + + def compute(targetClusterId: Double): Double = { + val (normalizedFeatureSum, numOfPoints) = broadcastedClustersMap.value(targetClusterId) + 1 - BLAS.dot(normalizedFeatures, normalizedFeatureSum) / numOfPoints + } + + pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet, + clusterId, + broadcastedClustersMap.value(clusterId)._2, + compute) + } + + /** + * Compute the Silhouette score of the dataset using the cosine distance measure. + * + * @param dataset The input dataset (previously clustered) on which compute the Silhouette. + * @param predictionCol The name of the column which contains the predicted cluster id + * for the point. + * @param featuresCol The name of the column which contains the feature vector of the point. + * @return The average of the Silhouette values of the clustered data. + */ + def computeSilhouetteScore( + dataset: Dataset[_], + predictionCol: String, + featuresCol: String): Double = { + val normalizeFeatureUDF = udf { + features: Vector => { + val norm = Vectors.norm(features, 2.0) + BLAS.scal(1.0 / norm, features) + features + } + } + val dfWithNormalizedFeatures = dataset.withColumn(normalizedFeaturesColName, + normalizeFeatureUDF(col(featuresCol))) + + // compute aggregate values for clusters needed by the algorithm + val clustersStatsMap = computeClusterStats(dfWithNormalizedFeatures, featuresCol, + predictionCol) + + // Silhouette is reasonable only when the number of clusters is greater then 1 + assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.") + + val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap) + + val computeSilhouetteCoefficientUDF = udf { + computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double) + } + + val silhouetteScore = overallScore(dfWithNormalizedFeatures, + computeSilhouetteCoefficientUDF(col(normalizedFeaturesColName), + col(predictionCol).cast(DoubleType))) + + bClustersStatsMap.destroy() + + silhouetteScore + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala index 1d6540e970383..ad1b70915e157 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluator.scala @@ -153,6 +153,34 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid @Since("2.0.0") override def evaluate(dataset: Dataset[_]): Double = { + val metrics = getMetrics(dataset) + $(metricName) match { + case "f1" => metrics.weightedFMeasure + case "accuracy" => metrics.accuracy + case "weightedPrecision" => metrics.weightedPrecision + case "weightedRecall" => metrics.weightedRecall + case "weightedTruePositiveRate" => metrics.weightedTruePositiveRate + case "weightedFalsePositiveRate" => metrics.weightedFalsePositiveRate + case "weightedFMeasure" => metrics.weightedFMeasure($(beta)) + case "truePositiveRateByLabel" => metrics.truePositiveRate($(metricLabel)) + case "falsePositiveRateByLabel" => metrics.falsePositiveRate($(metricLabel)) + case "precisionByLabel" => metrics.precision($(metricLabel)) + case "recallByLabel" => metrics.recall($(metricLabel)) + case "fMeasureByLabel" => metrics.fMeasure($(metricLabel), $(beta)) + case "hammingLoss" => metrics.hammingLoss + case "logLoss" => metrics.logLoss($(eps)) + } + } + + /** + * Get a MulticlassMetrics, which can be used to get multiclass classification + * metrics such as accuracy, weightedPrecision, etc. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return MulticlassMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): MulticlassMetrics = { val schema = dataset.schema SchemaUtils.checkColumnType(schema, $(predictionCol), DoubleType) SchemaUtils.checkNumericType(schema, $(labelCol)) @@ -163,9 +191,13 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid lit(1.0) } - val rdd = if ($(metricName) == "logLoss") { + if ($(metricName) == "logLoss") { // probabilityCol is only needed to compute logloss - require(isDefined(probabilityCol) && $(probabilityCol).nonEmpty) + require(schema.fieldNames.contains($(probabilityCol)), + "probabilityCol is needed to compute logloss") + } + + val rdd = if (schema.fieldNames.contains($(probabilityCol))) { val p = DatasetUtils.columnToVector(dataset, $(probabilityCol)) dataset.select(col($(predictionCol)), col($(labelCol)).cast(DoubleType), w, p) .rdd.map { @@ -179,23 +211,7 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid } } - val metrics = new MulticlassMetrics(rdd) - $(metricName) match { - case "f1" => metrics.weightedFMeasure - case "accuracy" => metrics.accuracy - case "weightedPrecision" => metrics.weightedPrecision - case "weightedRecall" => metrics.weightedRecall - case "weightedTruePositiveRate" => metrics.weightedTruePositiveRate - case "weightedFalsePositiveRate" => metrics.weightedFalsePositiveRate - case "weightedFMeasure" => metrics.weightedFMeasure($(beta)) - case "truePositiveRateByLabel" => metrics.truePositiveRate($(metricLabel)) - case "falsePositiveRateByLabel" => metrics.falsePositiveRate($(metricLabel)) - case "precisionByLabel" => metrics.precision($(metricLabel)) - case "recallByLabel" => metrics.recall($(metricLabel)) - case "fMeasureByLabel" => metrics.fMeasure($(metricLabel), $(beta)) - case "hammingLoss" => metrics.hammingLoss - case "logLoss" => metrics.logLoss($(eps)) - } + new MulticlassMetrics(rdd) } @Since("1.5.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluator.scala index a8db5452bd56c..1a82ac7a9472f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluator.scala @@ -98,18 +98,7 @@ class MultilabelClassificationEvaluator @Since("3.0.0") (@Since("3.0.0") overrid @Since("3.0.0") override def evaluate(dataset: Dataset[_]): Double = { - val schema = dataset.schema - SchemaUtils.checkColumnTypes(schema, $(predictionCol), - Seq(ArrayType(DoubleType, false), ArrayType(DoubleType, true))) - SchemaUtils.checkColumnTypes(schema, $(labelCol), - Seq(ArrayType(DoubleType, false), ArrayType(DoubleType, true))) - - val predictionAndLabels = - dataset.select(col($(predictionCol)), col($(labelCol))) - .rdd.map { row => - (row.getSeq[Double](0).toArray, row.getSeq[Double](1).toArray) - } - val metrics = new MultilabelMetrics(predictionAndLabels) + val metrics = getMetrics(dataset) $(metricName) match { case "subsetAccuracy" => metrics.subsetAccuracy case "accuracy" => metrics.accuracy @@ -126,6 +115,29 @@ class MultilabelClassificationEvaluator @Since("3.0.0") (@Since("3.0.0") overrid } } + /** + * Get a MultilabelMetrics, which can be used to get multilabel classification + * metrics such as accuracy, precision, precisionByLabel, etc. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return MultilabelMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): MultilabelMetrics = { + val schema = dataset.schema + SchemaUtils.checkColumnTypes(schema, $(predictionCol), + Seq(ArrayType(DoubleType, false), ArrayType(DoubleType, true))) + SchemaUtils.checkColumnTypes(schema, $(labelCol), + Seq(ArrayType(DoubleType, false), ArrayType(DoubleType, true))) + + val predictionAndLabels = + dataset.select(col($(predictionCol)), col($(labelCol))) + .rdd.map { row => + (row.getSeq[Double](0).toArray, row.getSeq[Double](1).toArray) + } + new MultilabelMetrics(predictionAndLabels) + } + @Since("3.0.0") override def isLargerBetter: Boolean = { $(metricName) match { diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala index c5dea6c177e21..82dda4109771d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RankingEvaluator.scala @@ -95,6 +95,25 @@ class RankingEvaluator @Since("3.0.0") (@Since("3.0.0") override val uid: String @Since("3.0.0") override def evaluate(dataset: Dataset[_]): Double = { + val metrics = getMetrics(dataset) + $(metricName) match { + case "meanAveragePrecision" => metrics.meanAveragePrecision + case "meanAveragePrecisionAtK" => metrics.meanAveragePrecisionAt($(k)) + case "precisionAtK" => metrics.precisionAt($(k)) + case "ndcgAtK" => metrics.ndcgAt($(k)) + case "recallAtK" => metrics.recallAt($(k)) + } + } + + /** + * Get a RankingMetrics, which can be used to get ranking metrics + * such as meanAveragePrecision, meanAveragePrecisionAtK, etc. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return RankingMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): RankingMetrics[Double] = { val schema = dataset.schema SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(ArrayType(DoubleType, false), ArrayType(DoubleType, true))) @@ -106,14 +125,7 @@ class RankingEvaluator @Since("3.0.0") (@Since("3.0.0") override val uid: String .rdd.map { row => (row.getSeq[Double](0).toArray, row.getSeq[Double](1).toArray) } - val metrics = new RankingMetrics[Double](predictionAndLabels) - $(metricName) match { - case "meanAveragePrecision" => metrics.meanAveragePrecision - case "meanAveragePrecisionAtK" => metrics.meanAveragePrecisionAt($(k)) - case "precisionAtK" => metrics.precisionAt($(k)) - case "ndcgAtK" => metrics.ndcgAt($(k)) - case "recallAtK" => metrics.recallAt($(k)) - } + new RankingMetrics[Double](predictionAndLabels) } @Since("3.0.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala index 18a8dda0c76ef..aca017762deca 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/RegressionEvaluator.scala @@ -97,6 +97,25 @@ final class RegressionEvaluator @Since("1.4.0") (@Since("1.4.0") override val ui @Since("2.0.0") override def evaluate(dataset: Dataset[_]): Double = { + val metrics = getMetrics(dataset) + $(metricName) match { + case "rmse" => metrics.rootMeanSquaredError + case "mse" => metrics.meanSquaredError + case "r2" => metrics.r2 + case "mae" => metrics.meanAbsoluteError + case "var" => metrics.explainedVariance + } + } + + /** + * Get a RegressionMetrics, which can be used to get regression + * metrics such as rootMeanSquaredError, meanSquaredError, etc. + * + * @param dataset a dataset that contains labels/observations and predictions. + * @return RegressionMetrics + */ + @Since("3.1.0") + def getMetrics(dataset: Dataset[_]): RegressionMetrics = { val schema = dataset.schema SchemaUtils.checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, FloatType)) SchemaUtils.checkNumericType(schema, $(labelCol)) @@ -107,14 +126,7 @@ final class RegressionEvaluator @Since("1.4.0") (@Since("1.4.0") override val ui .rdd .map { case Row(prediction: Double, label: Double, weight: Double) => (prediction, label, weight) } - val metrics = new RegressionMetrics(predictionAndLabelsWithWeights, $(throughOrigin)) - $(metricName) match { - case "rmse" => metrics.rootMeanSquaredError - case "mse" => metrics.meanSquaredError - case "r2" => metrics.r2 - case "mae" => metrics.meanAbsoluteError - case "var" => metrics.explainedVariance - } + new RegressionMetrics(predictionAndLabelsWithWeights, $(throughOrigin)) } @Since("1.4.0") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala index 050ebb0fa4fbd..1a91801a9da28 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala @@ -283,7 +283,8 @@ class MulticlassMetrics @Since("1.1.0") (predictionAndLabels: RDD[_ <: Product]) (loss * weight, weight) case other => - throw new IllegalArgumentException(s"Expected quadruples, got $other") + throw new IllegalArgumentException(s"Invalid RDD value for MulticlassMetrics.logLoss. " + + s"Expected quadruples, got $other") }.treeReduce { case ((l1, w1), (l2, w2)) => (l1 + l2, w1 + w2) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala index 83b213ab51d43..008bf0e108e13 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluatorSuite.scala @@ -102,4 +102,27 @@ class BinaryClassificationEvaluatorSuite val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("prediction") MLTestingUtils.checkNumericTypes(evaluator, spark) } + + test("getMetrics") { + val weightCol = "weight" + // get metric with weight column + val evaluator = new BinaryClassificationEvaluator() + .setWeightCol(weightCol) + val vectorDF = Seq( + (0.0, Vectors.dense(2.5, 12), 1.0), + (1.0, Vectors.dense(1, 3), 1.0), + (0.0, Vectors.dense(10, 2), 1.0) + ).toDF("label", "rawPrediction", weightCol) + + val metrics = evaluator.getMetrics(vectorDF) + val roc = metrics.areaUnderROC() + val pr = metrics.areaUnderPR() + + // default = areaUnderROC + assert(evaluator.evaluate(vectorDF) == roc) + + // areaUnderPR + evaluator.setMetricName("areaUnderPR") + assert(evaluator.evaluate(vectorDF) == pr) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala index 6cf3b1deeac93..29fed5322c9c9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala @@ -145,4 +145,20 @@ class ClusteringEvaluatorSuite assert(evaluator.evaluate(twoSingleItemClusters) === 0.0) } + test("getMetrics") { + val evaluator = new ClusteringEvaluator() + .setFeaturesCol("features") + .setPredictionCol("label") + + val metrics1 = evaluator.getMetrics(irisDataset) + val silhouetteScoreEuclidean = metrics1.silhouette + + assert(evaluator.evaluate(irisDataset) == silhouetteScoreEuclidean) + + evaluator.setDistanceMeasure("cosine") + val metrics2 = evaluator.getMetrics(irisDataset) + val silhouetteScoreCosin = metrics2.silhouette + + assert(evaluator.evaluate(irisDataset) == silhouetteScoreCosin) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluatorSuite.scala index 5b5212abdf7cc..3dfd860a5b9d8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/MulticlassClassificationEvaluatorSuite.scala @@ -80,4 +80,33 @@ class MulticlassClassificationEvaluatorSuite .setMetricName("logLoss") assert(evaluator.evaluate(df) ~== 0.9682005730687164 absTol 1e-5) } + + test("getMetrics") { + val predictionAndLabels = Seq((0.0, 0.0), (0.0, 1.0), + (0.0, 0.0), (1.0, 0.0), (1.0, 1.0), + (1.0, 1.0), (1.0, 1.0), (2.0, 2.0), (2.0, 0.0)).toDF("prediction", "label") + + val evaluator = new MulticlassClassificationEvaluator() + + val metrics = evaluator.getMetrics(predictionAndLabels) + val f1 = metrics.weightedFMeasure + val accuracy = metrics.accuracy + val precisionByLabel = metrics.precision(evaluator.getMetricLabel) + + // default = f1 + assert(evaluator.evaluate(predictionAndLabels) == f1) + + // accuracy + evaluator.setMetricName("accuracy") + assert(evaluator.evaluate(predictionAndLabels) == accuracy) + + // precisionByLabel + evaluator.setMetricName("precisionByLabel") + assert(evaluator.evaluate(predictionAndLabels) == precisionByLabel) + + // truePositiveRateByLabel + evaluator.setMetricName("truePositiveRateByLabel").setMetricLabel(1.0) + assert(evaluator.evaluate(predictionAndLabels) == + metrics.truePositiveRate(evaluator.getMetricLabel)) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluatorSuite.scala index f41fc04a5faed..520103d6aed92 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/MultilabelClassificationEvaluatorSuite.scala @@ -59,4 +59,52 @@ class MultilabelClassificationEvaluatorSuite .setMetricName("precisionByLabel") testDefaultReadWrite(evaluator) } + + test("getMetrics") { + val scoreAndLabels = Seq((Array(0.0, 1.0), Array(0.0, 2.0)), + (Array(0.0, 2.0), Array(0.0, 1.0)), + (Array.empty[Double], Array(0.0)), + (Array(2.0), Array(2.0)), + (Array(2.0, 0.0), Array(2.0, 0.0)), + (Array(0.0, 1.0, 2.0), Array(0.0, 1.0)), + (Array(1.0), Array(1.0, 2.0))).toDF("prediction", "label") + + val evaluator = new MultilabelClassificationEvaluator() + + val metrics = evaluator.getMetrics(scoreAndLabels) + val f1 = metrics.f1Measure + val accuracy = metrics.accuracy + val precision = metrics.precision + val recall = metrics.recall + val hammingLoss = metrics.hammingLoss + val precisionByLabel = metrics.precision(evaluator.getMetricLabel) + + // default = f1 + assert(evaluator.evaluate(scoreAndLabels) == f1) + + // accuracy + evaluator.setMetricName("accuracy") + assert(evaluator.evaluate(scoreAndLabels) == accuracy) + + // precision + evaluator.setMetricName("precision") + assert(evaluator.evaluate(scoreAndLabels) == precision) + + // recall + evaluator.setMetricName("recall") + assert(evaluator.evaluate(scoreAndLabels) == recall) + + // hammingLoss + evaluator.setMetricName("hammingLoss") + assert(evaluator.evaluate(scoreAndLabels) == hammingLoss) + + // precisionByLabel + evaluator.setMetricName("precisionByLabel") + assert(evaluator.evaluate(scoreAndLabels) == precisionByLabel) + + // truePositiveRateByLabel + evaluator.setMetricName("recallByLabel").setMetricLabel(1.0) + assert(evaluator.evaluate(scoreAndLabels) == + metrics.recall(evaluator.getMetricLabel)) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/RankingEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/RankingEvaluatorSuite.scala index 02d26d7eb351f..b3457981a08e9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/RankingEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/RankingEvaluatorSuite.scala @@ -59,4 +59,42 @@ class RankingEvaluatorSuite .setK(2) assert(evaluator.evaluate(scoreAndLabels) ~== 1.0 / 3 absTol 1e-5) } + + test("getMetrics") { + val scoreAndLabels = Seq( + (Array(1.0, 6.0, 2.0, 7.0, 8.0, 3.0, 9.0, 10.0, 4.0, 5.0), + Array(1.0, 2.0, 3.0, 4.0, 5.0)), + (Array(4.0, 1.0, 5.0, 6.0, 2.0, 7.0, 3.0, 8.0, 9.0, 10.0), + Array(1.0, 2.0, 3.0)), + (Array(1.0, 2.0, 3.0, 4.0, 5.0), Array.empty[Double]) + ).toDF("prediction", "label") + + val evaluator = new RankingEvaluator().setK(5) + + val metrics = evaluator.getMetrics(scoreAndLabels) + val meanAveragePrecision = metrics.meanAveragePrecision + val meanAveragePrecisionAtK = metrics.meanAveragePrecisionAt(evaluator.getK) + val precisionAtK = metrics.precisionAt(evaluator.getK) + val ndcgAtK = metrics.ndcgAt(evaluator.getK) + val recallAtK = metrics.recallAt(evaluator.getK) + + // default = meanAveragePrecision + assert(evaluator.evaluate(scoreAndLabels) == meanAveragePrecision) + + // meanAveragePrecisionAtK + evaluator.setMetricName("meanAveragePrecisionAtK") + assert(evaluator.evaluate(scoreAndLabels) == meanAveragePrecisionAtK) + + // precisionAtK + evaluator.setMetricName("precisionAtK") + assert(evaluator.evaluate(scoreAndLabels) == precisionAtK) + + // ndcgAtK + evaluator.setMetricName("ndcgAtK") + assert(evaluator.evaluate(scoreAndLabels) == ndcgAtK) + + // recallAtK + evaluator.setMetricName("recallAtK") + assert(evaluator.evaluate(scoreAndLabels) == recallAtK) + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/RegressionEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/RegressionEvaluatorSuite.scala index f4f858c3e92dc..5ee161ce8dd33 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/RegressionEvaluatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/RegressionEvaluatorSuite.scala @@ -93,4 +93,37 @@ class RegressionEvaluatorSuite test("should support all NumericType labels and not support other types") { MLTestingUtils.checkNumericTypes(new RegressionEvaluator, spark) } + + test("getMetrics") { + val dataset = LinearDataGenerator.generateLinearInput( + 6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1) + .map(_.asML).toDF() + + val trainer = new LinearRegression + val model = trainer.fit(dataset) + val predictions = model.transform(dataset) + + val evaluator = new RegressionEvaluator() + + val metrics = evaluator.getMetrics(predictions) + val rmse = metrics.rootMeanSquaredError + val r2 = metrics.r2 + val mae = metrics.meanAbsoluteError + val variance = metrics.explainedVariance + + // default = rmse + assert(evaluator.evaluate(predictions) == rmse) + + // r2 score + evaluator.setMetricName("r2") + assert(evaluator.evaluate(predictions) == r2) + + // mae + evaluator.setMetricName("mae") + assert(evaluator.evaluate(predictions) == mae) + + // var + evaluator.setMetricName("var") + assert(evaluator.evaluate(predictions) == variance) + } }