Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Since
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.functions.checkNonNegativeWeight
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DatasetUtils._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans,
BisectingKMeansModel => MLlibBisectingKMeansModel}
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.storage.StorageLevel


Expand Down Expand Up @@ -118,7 +118,7 @@ class BisectingKMeansModel private[ml] (
val outputSchema = transformSchema(dataset.schema, logging = true)
val predictUDF = udf((vector: Vector) => predict(vector))
dataset.withColumn($(predictionCol),
predictUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol)),
predictUDF(columnToVector(dataset, getFeaturesCol)),
outputSchema($(predictionCol)).metadata)
}

Expand Down Expand Up @@ -152,7 +152,7 @@ class BisectingKMeansModel private[ml] (
"summary.", "3.0.0")
def computeCost(dataset: Dataset[_]): Double = {
SchemaUtils.validateVectorCompatibleColumn(dataset.schema, getFeaturesCol)
val data = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)
val data = columnToOldVector(dataset, getFeaturesCol)
parentModel.computeCost(data)
}

Expand Down Expand Up @@ -287,13 +287,11 @@ class BisectingKMeans @Since("2.0.0") (
.setSeed($(seed))
.setDistanceMeasure($(distanceMeasure))

val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
checkNonNegativeWeight(col($(weightCol)).cast(DoubleType))
} else {
lit(1.0)
}
val instances = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w)
.rdd.map { case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight) }
val instances = dataset.select(
checkNonNanVectors(columnToVector(dataset, $(featuresCol))),
checkNonNegativeWeights(get(weightCol))
).rdd.map { case Row(f: Vector, w: Double) => (OldVectors.fromML(f), w)
}.setName("training instances")

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val parentModel = bkm.runWithWeight(instances, handlePersistence, Some(instr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.Since
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.functions.checkNonNegativeWeight
import org.apache.spark.ml.impl.Utils.{unpackUpperTriangular, EPSILON}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DatasetUtils._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatrix,
Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -117,7 +117,7 @@ class GaussianMixtureModel private[ml] (
override def transform(dataset: Dataset[_]): DataFrame = {
val outputSchema = transformSchema(dataset.schema, logging = true)

val vectorCol = DatasetUtils.columnToVector(dataset, $(featuresCol))
val vectorCol = columnToVector(dataset, $(featuresCol))
var outputData = dataset
var numColsOutput = 0

Expand Down Expand Up @@ -392,15 +392,11 @@ class GaussianMixture @Since("2.0.0") (
seed, tol, aggregationDepth)
instr.logNumFeatures(numFeatures)

val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
checkNonNegativeWeight(col($(weightCol)).cast(DoubleType))
} else {
lit(1.0)
}

val instances = dataset.select(DatasetUtils.columnToVector(dataset, $(featuresCol)), w)
.as[(Vector, Double)].rdd
.setName("training instances")
val instances = dataset.select(
checkNonNanVectors(columnToVector(dataset, $(featuresCol))),
checkNonNegativeWeights(get(weightCol))
).as[(Vector, Double)].rdd
.setName("training instances")

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
if (handlePersistence) { instances.persist(StorageLevel.MEMORY_AND_DISK) }
Expand Down
18 changes: 8 additions & 10 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.Since
import org.apache.spark.ml.{Estimator, Model, PipelineStage}
import org.apache.spark.ml.functions.checkNonNegativeWeight
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DatasetUtils._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel}
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.VersionUtils.majorVersion

Expand Down Expand Up @@ -136,7 +136,7 @@ class KMeansModel private[ml] (
val predictUDF = udf((vector: Vector) => predict(vector))

dataset.withColumn($(predictionCol),
predictUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol)),
predictUDF(columnToVector(dataset, getFeaturesCol)),
outputSchema($(predictionCol)).metadata)
}

Expand Down Expand Up @@ -342,13 +342,11 @@ class KMeans @Since("1.5.0") (
.setEpsilon($(tol))
.setDistanceMeasure($(distanceMeasure))

val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
checkNonNegativeWeight(col($(weightCol)).cast(DoubleType))
} else {
lit(1.0)
}
val instances = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w)
.rdd.map { case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight) }
val instances = dataset.select(
checkNonNanVectors(columnToVector(dataset, $(featuresCol))),
checkNonNegativeWeights(get(weightCol))
).rdd.map { case Row(f: Vector, w: Double) => (OldVectors.fromML(f), w)
}.setName("training instances")

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val parentModel = algo.runWithWeight(instances, handlePersistence, Some(instr))
Expand Down
15 changes: 7 additions & 8 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed}
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DatasetUtils._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel,
Expand Down Expand Up @@ -467,7 +468,7 @@ abstract class LDAModel private[ml] (
val func = getTopicDistributionMethod
val transformer = udf(func)
dataset.withColumn($(topicDistributionCol),
transformer(DatasetUtils.columnToVector(dataset, getFeaturesCol)),
transformer(columnToVector(dataset, getFeaturesCol)),
outputSchema($(topicDistributionCol)).metadata)
}

Expand Down Expand Up @@ -945,6 +946,7 @@ class LDA @Since("1.6.0") (
learningDecay, optimizer, learningOffset, seed)

val oldData = LDA.getOldDataset(dataset, $(featuresCol))
.setName("training instances")

// The EM solver will transform this oldData to a graph, and use a internal graphCheckpointer
// to update and cache the graph, so we do not need to cache it.
Expand Down Expand Up @@ -993,13 +995,10 @@ object LDA extends MLReadable[LDA] {
private[clustering] def getOldDataset(
dataset: Dataset[_],
featuresCol: String): RDD[(Long, OldVector)] = {
dataset
.select(monotonically_increasing_id(),
DatasetUtils.columnToVector(dataset, featuresCol))
.rdd
.map { case Row(docId: Long, features: Vector) =>
(docId, OldVectors.fromML(features))
}
dataset.select(
monotonically_increasing_id(),
checkNonNanVectors(columnToVector(dataset, featuresCol))
).rdd.map { case Row(docId: Long, f: Vector) => (docId, OldVectors.fromML(f)) }
}

private class LDAReader extends MLReader[LDA] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import org.apache.spark.annotation.Since
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DatasetUtils._
import org.apache.spark.mllib.clustering.{PowerIterationClustering => MLlibPowerIterationClustering}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -156,28 +156,28 @@ class PowerIterationClustering private[clustering] (
*/
@Since("2.4.0")
def assignClusters(dataset: Dataset[_]): DataFrame = {
val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) {
lit(1.0)
} else {
SchemaUtils.checkNumericType(dataset.schema, $(weightCol))
col($(weightCol)).cast(DoubleType)
}
val spark = dataset.sparkSession
import spark.implicits._

SchemaUtils.checkColumnTypes(dataset.schema, $(srcCol), Seq(IntegerType, LongType))
SchemaUtils.checkColumnTypes(dataset.schema, $(dstCol), Seq(IntegerType, LongType))
val rdd: RDD[(Long, Long, Double)] = dataset.select(
get(weightCol) match {
case Some(w) if w.nonEmpty => SchemaUtils.checkNumericType(dataset.schema, w)
case _ =>
}

val rdd = dataset.select(
col($(srcCol)).cast(LongType),
col($(dstCol)).cast(LongType),
w).rdd.map {
case Row(src: Long, dst: Long, weight: Double) => (src, dst, weight)
}
checkNonNegativeWeights(get(weightCol))
).as[(Long, Long, Double)].rdd

val algorithm = new MLlibPowerIterationClustering()
.setK($(k))
.setInitializationMode($(initMode))
.setMaxIterations($(maxIter))
val model = algorithm.run(rdd)

import dataset.sparkSession.implicits._
model.assignments.toDF
}

Expand Down
57 changes: 34 additions & 23 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ import org.apache.spark.ml.linalg.BLAS
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DatasetUtils._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.CholeskyDecomposition
import org.apache.spark.mllib.optimization.NNLS
import org.apache.spark.rdd.{DeterministicLevel, RDD}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -86,22 +87,25 @@ private[recommendation] trait ALSModelParams extends Params with HasPredictionCo
* Attempts to safely cast a user/item id to an Int. Throws an exception if the value is
* out of integer range or contains a fractional part.
*/
protected[recommendation] val checkedCast = udf { (n: Any) =>
n match {
case v: Int => v // Avoid unnecessary casting
case v: Number =>
val intV = v.intValue
protected[recommendation] def checkIntegers(dataset: Dataset[_], colName: String): Column = {
dataset.schema(colName).dataType match {
case IntegerType =>
val column = dataset(colName)
when(column.isNull, raise_error(lit(s"$colName Ids MUST NOT be Null")))
.otherwise(column)

case _: NumericType =>
val column = dataset(colName)
val casted = column.cast(IntegerType)
// Checks if number within Int range and has no fractional part.
if (v.doubleValue == intV) {
intV
} else {
throw new IllegalArgumentException(s"ALS only supports values in Integer range " +
s"and without fractional part for columns ${$(userCol)} and ${$(itemCol)}. " +
s"Value $n was either out of Integer range or contained a fractional part that " +
s"could not be converted.")
}
case _ => throw new IllegalArgumentException(s"ALS only supports values in Integer range " +
s"for columns ${$(userCol)} and ${$(itemCol)}. Value $n was not numeric.")
when(column.isNull || column =!= casted,
raise_error(concat(
lit(s"ALS only supports non-Null values in Integer range and " +
s"without fractional part for column $colName, but got "), column)))
.otherwise(casted)

case other => throw new IllegalArgumentException(s"ALS only supports values in " +
s"Integer range for column $colName, but got type $other.")
}
}

Expand Down Expand Up @@ -318,11 +322,13 @@ class ALSModel private[ml] (
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema)
// create a new column named map(predictionCol) by running the predict UDF.
val validatedUsers = checkIntegers(dataset, $(userCol))
val validatedItems = checkIntegers(dataset, $(itemCol))
val predictions = dataset
.join(userFactors,
checkedCast(dataset($(userCol))) === userFactors("id"), "left")
validatedUsers === userFactors("id"), "left")
.join(itemFactors,
checkedCast(dataset($(itemCol))) === itemFactors("id"), "left")
validatedItems === itemFactors("id"), "left")
.select(dataset("*"),
predict(userFactors("features"), itemFactors("features")).as($(predictionCol)))
getColdStartStrategy match {
Expand Down Expand Up @@ -705,13 +711,18 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
transformSchema(dataset.schema)
import dataset.sparkSession.implicits._

val r = if ($(ratingCol) != "") col($(ratingCol)).cast(FloatType) else lit(1.0f)
val validatedUsers = checkIntegers(dataset, $(userCol))
val validatedItems = checkIntegers(dataset, $(itemCol))
val validatedRatings = if ($(ratingCol).nonEmpty) {
checkNonNanValues($(ratingCol), "Ratings").cast(FloatType)
} else {
lit(1.0f)
}

val ratings = dataset
.select(checkedCast(col($(userCol))), checkedCast(col($(itemCol))), r)
.select(validatedUsers, validatedItems, validatedRatings)
.rdd
.map { row =>
Rating(row.getInt(0), row.getInt(1), row.getFloat(2))
}
.map { case Row(u: Int, i: Int, r: Float) => Rating(u, i, r) }

instr.logPipelineStage(this)
instr.logDataset(dataset)
Expand Down
15 changes: 9 additions & 6 deletions mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,16 @@ private[spark] object DatasetUtils {
case _ => lit(1.0)
}

private[ml] def checkNonNanVectors(vectorCol: String): Column = {
val vecCol = col(vectorCol)
when(vecCol.isNull, raise_error(lit("Vectors MUST NOT be Null")))
.when(!validateVector(vecCol),
private[ml] def checkNonNanVectors(vectorCol: Column): Column = {
when(vectorCol.isNull, raise_error(lit("Vectors MUST NOT be Null")))
.when(!validateVector(vectorCol),
raise_error(concat(lit("Vector values MUST NOT be NaN or Infinity, but got "),
vecCol.cast(StringType))))
.otherwise(vecCol)
vectorCol.cast(StringType))))
.otherwise(vectorCol)
}

private[ml] def checkNonNanVectors(vectorCol: String): Column = {
checkNonNanVectors(col(vectorCol))
}

private lazy val validateVector = udf { vector: Vector =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
assert(copiedModel.hasSummary)
}

test("BisectingKMeans validate input dataset") {
testInvalidWeights(new BisectingKMeans().setWeightCol("weight").fit(_))
testInvalidVectors(new BisectingKMeans().fit(_))
}

test("SPARK-16473: Verify Bisecting K-Means does not fail in edge case where" +
"one cluster is empty after split") {
val bkm = new BisectingKMeans()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
assert(copiedModel.hasSummary)
}

test("GaussianMixture validate input dataset") {
testInvalidWeights(new GaussianMixture().setWeightCol("weight").fit(_))
testInvalidVectors(new GaussianMixture().fit(_))
}

test("set parameters") {
val gm = new GaussianMixture()
.setK(9)
Expand Down
Loading