Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.ml.evaluation

import org.apache.spark.annotation.Since
import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol, HasWeightCol}
import org.apache.spark.ml.util._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType

/**
* Evaluator for clustering results.
Expand All @@ -34,7 +35,8 @@ import org.apache.spark.sql.functions.col
*/
@Since("2.3.0")
class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: String)
extends Evaluator with HasPredictionCol with HasFeaturesCol with DefaultParamsWritable {
extends Evaluator with HasPredictionCol with HasFeaturesCol with HasWeightCol
with DefaultParamsWritable {

@Since("2.3.0")
def this() = this(Identifiable.randomUID("cluEval"))
Expand All @@ -53,6 +55,10 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str
@Since("2.3.0")
def setFeaturesCol(value: String): this.type = set(featuresCol, value)

/** @group setParam */
@Since("3.1.0")
def setWeightCol(value: String): this.type = set(weightCol, value)

/**
* param for metric name in evaluation
* (supports `"silhouette"` (default))
Expand Down Expand Up @@ -116,12 +122,26 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: Str
*/
@Since("3.1.0")
def getMetrics(dataset: Dataset[_]): ClusteringMetrics = {
SchemaUtils.validateVectorCompatibleColumn(dataset.schema, $(featuresCol))
SchemaUtils.checkNumericType(dataset.schema, $(predictionCol))
val schema = dataset.schema
SchemaUtils.validateVectorCompatibleColumn(schema, $(featuresCol))
SchemaUtils.checkNumericType(schema, $(predictionCol))
if (isDefined(weightCol)) {
SchemaUtils.checkNumericType(schema, $(weightCol))
}

val weightColName = if (!isDefined(weightCol)) "weightCol" else $(weightCol)

val vectorCol = DatasetUtils.columnToVector(dataset, $(featuresCol))
val df = dataset.select(col($(predictionCol)),
vectorCol.as($(featuresCol), dataset.schema($(featuresCol)).metadata))
val df = if (!isDefined(weightCol) || $(weightCol).isEmpty) {
dataset.select(col($(predictionCol)),
vectorCol.as($(featuresCol), dataset.schema($(featuresCol)).metadata),
lit(1.0).as(weightColName))
} else {
dataset.select(col($(predictionCol)),
vectorCol.as($(featuresCol), dataset.schema($(featuresCol)).metadata),
col(weightColName).cast(DoubleType))
}

val metrics = new ClusteringMetrics(df)
metrics.setDistanceMeasure($(distanceMeasure))
metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class ClusteringMetrics private[spark](dataset: Dataset[_]) {
val columns = dataset.columns.toSeq
if (distanceMeasure.equalsIgnoreCase("squaredEuclidean")) {
SquaredEuclideanSilhouette.computeSilhouetteScore(
dataset, columns(0), columns(1))
dataset, columns(0), columns(1), columns(2))
} else {
CosineSilhouette.computeSilhouetteScore(dataset, columns(0), columns(1))
CosineSilhouette.computeSilhouetteScore(dataset, columns(0), columns(1), columns(2))
}
}
}
Expand All @@ -63,9 +63,10 @@ private[evaluation] abstract class Silhouette {
def pointSilhouetteCoefficient(
clusterIds: Set[Double],
pointClusterId: Double,
pointClusterNumOfPoints: Long,
weightSum: Double,
weight: Double,
averageDistanceToCluster: (Double) => Double): Double = {
if (pointClusterNumOfPoints == 1) {
if (weightSum == weight) {
// Single-element clusters have silhouette 0
0.0
} else {
Expand All @@ -77,8 +78,8 @@ private[evaluation] abstract class Silhouette {
val neighboringClusterDissimilarity = otherClusterIds.map(averageDistanceToCluster).min
// adjustment for excluding the node itself from the computation of the average dissimilarity
val currentClusterDissimilarity =
averageDistanceToCluster(pointClusterId) * pointClusterNumOfPoints /
(pointClusterNumOfPoints - 1)
averageDistanceToCluster(pointClusterId) * weightSum /
(weightSum - weight)
if (currentClusterDissimilarity < neighboringClusterDissimilarity) {
1 - (currentClusterDissimilarity / neighboringClusterDissimilarity)
} else if (currentClusterDissimilarity > neighboringClusterDissimilarity) {
Expand All @@ -92,8 +93,8 @@ private[evaluation] abstract class Silhouette {
/**
* Compute the mean Silhouette values of all samples.
*/
def overallScore(df: DataFrame, scoreColumn: Column): Double = {
df.select(avg(scoreColumn)).collect()(0).getDouble(0)
def overallScore(df: DataFrame, scoreColumn: Column, weightColumn: Column): Double = {
df.select(sum(scoreColumn * weightColumn) / sum(weightColumn)).collect()(0).getDouble(0)
}
}

Expand Down Expand Up @@ -267,7 +268,7 @@ private[evaluation] object SquaredEuclideanSilhouette extends Silhouette {
}
}

case class ClusterStats(featureSum: Vector, squaredNormSum: Double, numOfPoints: Long)
case class ClusterStats(featureSum: Vector, squaredNormSum: Double, weightSum: Double)

/**
* The method takes the input dataset and computes the aggregated values
Expand All @@ -277,43 +278,47 @@ private[evaluation] object SquaredEuclideanSilhouette extends Silhouette {
* @param predictionCol The name of the column which contains the predicted cluster id
* for the point.
* @param featuresCol The name of the column which contains the feature vector of the point.
* @param weightCol The name of the column which contains the instance weight.
* @return A [[scala.collection.immutable.Map]] which associates each cluster id
* to a [[ClusterStats]] object (which contains the precomputed values `N`,
* `$\Psi_{\Gamma}$` and `$Y_{\Gamma}$` for a cluster).
*/
def computeClusterStats(
df: DataFrame,
predictionCol: String,
featuresCol: String): Map[Double, ClusterStats] = {
featuresCol: String,
weightCol: String): Map[Double, ClusterStats] = {
val numFeatures = MetadataUtils.getNumFeatures(df, featuresCol)
val clustersStatsRDD = df.select(
col(predictionCol).cast(DoubleType), col(featuresCol), col("squaredNorm"))
col(predictionCol).cast(DoubleType), col(featuresCol), col("squaredNorm"), col(weightCol))
.rdd
.map { row => (row.getDouble(0), (row.getAs[Vector](1), row.getDouble(2))) }
.aggregateByKey[(DenseVector, Double, Long)]((Vectors.zeros(numFeatures).toDense, 0.0, 0L))(
seqOp = {
case (
(featureSum: DenseVector, squaredNormSum: Double, numOfPoints: Long),
(features, squaredNorm)
) =>
BLAS.axpy(1.0, features, featureSum)
(featureSum, squaredNormSum + squaredNorm, numOfPoints + 1)
},
combOp = {
case (
(featureSum1, squaredNormSum1, numOfPoints1),
(featureSum2, squaredNormSum2, numOfPoints2)
) =>
BLAS.axpy(1.0, featureSum2, featureSum1)
(featureSum1, squaredNormSum1 + squaredNormSum2, numOfPoints1 + numOfPoints2)
}
)
.map { row => (row.getDouble(0), (row.getAs[Vector](1), row.getDouble(2), row.getDouble(3))) }
.aggregateByKey
[(DenseVector, Double, Double)]((Vectors.zeros(numFeatures).toDense, 0.0, 0.0))(
seqOp = {
case (
(featureSum: DenseVector, squaredNormSum: Double, weightSum: Double),
(features, squaredNorm, weight)
) =>
require(weight >= 0.0, s"illegal weight value: $weight. weight must be >= 0.0.")
BLAS.axpy(weight, features, featureSum)
(featureSum, squaredNormSum + squaredNorm * weight, weightSum + weight)
},
combOp = {
case (
(featureSum1, squaredNormSum1, weightSum1),
(featureSum2, squaredNormSum2, weightSum2)
) =>
BLAS.axpy(1.0, featureSum2, featureSum1)
(featureSum1, squaredNormSum1 + squaredNormSum2, weightSum1 + weightSum2)
}
)

clustersStatsRDD
.collectAsMap()
.mapValues {
case (featureSum: DenseVector, squaredNormSum: Double, numOfPoints: Long) =>
SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum, numOfPoints)
case (featureSum: DenseVector, squaredNormSum: Double, weightSum: Double) =>
SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum, weightSum)
}
.toMap
}
Expand All @@ -324,27 +329,30 @@ private[evaluation] object SquaredEuclideanSilhouette extends Silhouette {
* @param broadcastedClustersMap A map of the precomputed values for each cluster.
* @param point The [[org.apache.spark.ml.linalg.Vector]] representing the current point.
* @param clusterId The id of the cluster the current point belongs to.
* @param weight The instance weight of the current point.
* @param squaredNorm The `$\Xi_{X}$` (which is the squared norm) precomputed for the point.
* @return The Silhouette for the point.
*/
def computeSilhouetteCoefficient(
broadcastedClustersMap: Broadcast[Map[Double, ClusterStats]],
point: Vector,
clusterId: Double,
weight: Double,
squaredNorm: Double): Double = {

def compute(targetClusterId: Double): Double = {
val clusterStats = broadcastedClustersMap.value(targetClusterId)
val pointDotClusterFeaturesSum = BLAS.dot(point, clusterStats.featureSum)

squaredNorm +
clusterStats.squaredNormSum / clusterStats.numOfPoints -
2 * pointDotClusterFeaturesSum / clusterStats.numOfPoints
clusterStats.squaredNormSum / clusterStats.weightSum -
2 * pointDotClusterFeaturesSum / clusterStats.weightSum
}

pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet,
clusterId,
broadcastedClustersMap.value(clusterId).numOfPoints,
broadcastedClustersMap.value(clusterId).weightSum,
weight,
compute)
}

Expand All @@ -355,12 +363,14 @@ private[evaluation] object SquaredEuclideanSilhouette extends Silhouette {
* @param predictionCol The name of the column which contains the predicted cluster id
* for the point.
* @param featuresCol The name of the column which contains the feature vector of the point.
* @param weightCol The name of the column which contains instance weight.
* @return The average of the Silhouette values of the clustered data.
*/
def computeSilhouetteScore(
dataset: Dataset[_],
predictionCol: String,
featuresCol: String): Double = {
featuresCol: String,
weightCol: String): Double = {
SquaredEuclideanSilhouette.registerKryoClasses(dataset.sparkSession.sparkContext)

val squaredNormUDF = udf {
Expand All @@ -370,20 +380,20 @@ private[evaluation] object SquaredEuclideanSilhouette extends Silhouette {

// compute aggregate values for clusters needed by the algorithm
val clustersStatsMap = SquaredEuclideanSilhouette
.computeClusterStats(dfWithSquaredNorm, predictionCol, featuresCol)
.computeClusterStats(dfWithSquaredNorm, predictionCol, featuresCol, weightCol)

// Silhouette is reasonable only when the number of clusters is greater then 1
assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.")

val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap)

val computeSilhouetteCoefficientUDF = udf {
computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: Double)
computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: Double, _: Double)
}

val silhouetteScore = overallScore(dfWithSquaredNorm,
computeSilhouetteCoefficientUDF(col(featuresCol), col(predictionCol).cast(DoubleType),
col("squaredNorm")))
col(weightCol), col("squaredNorm")), col(weightCol))

bClustersStatsMap.destroy()

Expand Down Expand Up @@ -472,30 +482,35 @@ private[evaluation] object CosineSilhouette extends Silhouette {
* about a cluster which are needed by the algorithm.
*
* @param df The DataFrame which contains the input data
* @param featuresCol The name of the column which contains the feature vector of the point.
* @param predictionCol The name of the column which contains the predicted cluster id
* for the point.
* @param weightCol The name of the column which contains the instance weight.
* @return A [[scala.collection.immutable.Map]] which associates each cluster id to a
* its statistics (ie. the precomputed values `N` and `$\Omega_{\Gamma}$`).
*/
def computeClusterStats(
df: DataFrame,
featuresCol: String,
predictionCol: String): Map[Double, (Vector, Long)] = {
predictionCol: String,
weightCol: String): Map[Double, (Vector, Double)] = {
val numFeatures = MetadataUtils.getNumFeatures(df, featuresCol)
val clustersStatsRDD = df.select(
col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName))
col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName), col(weightCol))
.rdd
.map { row => (row.getDouble(0), row.getAs[Vector](1)) }
.aggregateByKey[(DenseVector, Long)]((Vectors.zeros(numFeatures).toDense, 0L))(
.map { row => (row.getDouble(0), (row.getAs[Vector](1), row.getDouble(2))) }
.aggregateByKey[(DenseVector, Double)]((Vectors.zeros(numFeatures).toDense, 0.0))(
seqOp = {
case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long), (normalizedFeatures)) =>
BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum)
(normalizedFeaturesSum, numOfPoints + 1)
case ((normalizedFeaturesSum: DenseVector, weightSum: Double),
(normalizedFeatures, weight)) =>
require(weight >= 0.0, s"illegal weight value: $weight. weight must be >= 0.0.")
BLAS.axpy(weight, normalizedFeatures, normalizedFeaturesSum)
(normalizedFeaturesSum, weightSum + weight)
},
combOp = {
case ((normalizedFeaturesSum1, numOfPoints1), (normalizedFeaturesSum2, numOfPoints2)) =>
case ((normalizedFeaturesSum1, weightSum1), (normalizedFeaturesSum2, weightSum2)) =>
BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1)
(normalizedFeaturesSum1, numOfPoints1 + numOfPoints2)
(normalizedFeaturesSum1, weightSum1 + weightSum2)
}
)

Expand All @@ -511,11 +526,13 @@ private[evaluation] object CosineSilhouette extends Silhouette {
* @param normalizedFeatures The [[org.apache.spark.ml.linalg.Vector]] representing the
* normalized features of the current point.
* @param clusterId The id of the cluster the current point belongs to.
* @param weight The instance weight of the current point.
*/
def computeSilhouetteCoefficient(
broadcastedClustersMap: Broadcast[Map[Double, (Vector, Long)]],
broadcastedClustersMap: Broadcast[Map[Double, (Vector, Double)]],
normalizedFeatures: Vector,
clusterId: Double): Double = {
clusterId: Double,
weight: Double): Double = {

def compute(targetClusterId: Double): Double = {
val (normalizedFeatureSum, numOfPoints) = broadcastedClustersMap.value(targetClusterId)
Expand All @@ -525,6 +542,7 @@ private[evaluation] object CosineSilhouette extends Silhouette {
pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet,
clusterId,
broadcastedClustersMap.value(clusterId)._2,
weight,
compute)
}

Expand All @@ -535,12 +553,14 @@ private[evaluation] object CosineSilhouette extends Silhouette {
* @param predictionCol The name of the column which contains the predicted cluster id
* for the point.
* @param featuresCol The name of the column which contains the feature vector of the point.
* @param weightCol The name of the column which contains the instance weight.
* @return The average of the Silhouette values of the clustered data.
*/
def computeSilhouetteScore(
dataset: Dataset[_],
predictionCol: String,
featuresCol: String): Double = {
featuresCol: String,
weightCol: String): Double = {
val normalizeFeatureUDF = udf {
features: Vector => {
val norm = Vectors.norm(features, 2.0)
Expand All @@ -553,20 +573,20 @@ private[evaluation] object CosineSilhouette extends Silhouette {

// compute aggregate values for clusters needed by the algorithm
val clustersStatsMap = computeClusterStats(dfWithNormalizedFeatures, featuresCol,
predictionCol)
predictionCol, weightCol)

// Silhouette is reasonable only when the number of clusters is greater then 1
assert(clustersStatsMap.size > 1, "Number of clusters must be greater than one.")

val bClustersStatsMap = dataset.sparkSession.sparkContext.broadcast(clustersStatsMap)

val computeSilhouetteCoefficientUDF = udf {
computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double)
computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: Double)
}

val silhouetteScore = overallScore(dfWithNormalizedFeatures,
computeSilhouetteCoefficientUDF(col(normalizedFeaturesColName),
col(predictionCol).cast(DoubleType)))
col(predictionCol).cast(DoubleType), col(weightCol)), col(weightCol))

bClustersStatsMap.destroy()

Expand Down
Loading