diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 0955d3e6e1f8..efccdda71e27 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -262,13 +262,16 @@ private[recommendation] trait ALSParams extends ALSModelParams with HasMaxIter w * @param rank rank of the matrix factorization model * @param userFactors a DataFrame that stores user factors in two columns: `id` and `features` * @param itemFactors a DataFrame that stores item factors in two columns: `id` and `features` + * @param blockSize number of records for each block, adjust this parameter to improve + * the efficiency of cartesian product */ @Since("1.3.0") class ALSModel private[ml] ( @Since("1.4.0") override val uid: String, @Since("1.4.0") val rank: Int, @transient val userFactors: DataFrame, - @transient val itemFactors: DataFrame) + @transient val itemFactors: DataFrame, + @Since("2.2.0") var blockSize: Int = 4096) extends Model[ALSModel] with ALSModelParams with MLWritable { /** @group setParam */ @@ -283,6 +286,13 @@ class ALSModel private[ml] ( @Since("1.3.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) + /** @group setParam */ + @Since("2.2.0") + def setBlockSize(blockSize: Int): this.type = { + this.blockSize = blockSize + this + } + /** @group expertSetParam */ @Since("2.2.0") def setColdStartStrategy(value: String): this.type = set(coldStartStrategy, value) @@ -341,7 +351,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllUsers(numItems: Int): DataFrame = { - recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems) + recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems, blockSize) } /** @@ -352,7 +362,7 @@ class ALSModel private[ml] ( */ @Since("2.2.0") def recommendForAllItems(numUsers: Int): DataFrame = { - recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers) + recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers, blockSize) } /** @@ -375,6 +385,7 @@ class ALSModel private[ml] ( * @param srcOutputColumn name of the column for the source ID in the output DataFrame * @param dstOutputColumn name of the column for the destination ID in the output DataFrame * @param num max number of recommendations for each record + * @param blockSize number of records for each block * @return a DataFrame of (srcOutputColumn: Int, recommendations), where recommendations are * stored as an array of (dstOutputColumn: Int, rating: Float) Rows. */ @@ -383,11 +394,12 @@ class ALSModel private[ml] ( dstFactors: DataFrame, srcOutputColumn: String, dstOutputColumn: String, - num: Int): DataFrame = { + num: Int, + blockSize: Int = 4096): DataFrame = { import srcFactors.sparkSession.implicits._ - val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])]) - val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])]) + val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])], blockSize) + val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])], blockSize) val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])] .flatMap { case (srcIter, dstIter) => @@ -425,7 +437,6 @@ class ALSModel private[ml] ( /** * Blockifies factors to improve the efficiency of cross join - * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( factors: Dataset[(Int, Array[Float])], diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index ac709ad72f0c..8636d6b5c3de 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -47,6 +47,8 @@ import org.apache.spark.util.BoundedPriorityQueue * the features computed for this user. * @param productFeatures RDD of tuples where each tuple represents the productId * and the features computed for this product. + * @param blockSize Number of records for each block, adjust this parameter to improve + * the efficiency of cartesian product * * @note If you create the model directly using constructor, please be aware that fast prediction * requires cached user/product features and their associated partitioners. @@ -55,7 +57,8 @@ import org.apache.spark.util.BoundedPriorityQueue class MatrixFactorizationModel @Since("0.8.0") ( @Since("0.8.0") val rank: Int, @Since("0.8.0") val userFeatures: RDD[(Int, Array[Double])], - @Since("0.8.0") val productFeatures: RDD[(Int, Array[Double])]) + @Since("0.8.0") val productFeatures: RDD[(Int, Array[Double])], + @Since("2.2.0") var blockSize: Int = 4096) extends Saveable with Serializable with Logging { require(rank > 0) @@ -215,7 +218,8 @@ class MatrixFactorizationModel @Since("0.8.0") ( */ @Since("1.4.0") def recommendProductsForUsers(num: Int): RDD[(Int, Array[Rating])] = { - MatrixFactorizationModel.recommendForAll(rank, userFeatures, productFeatures, num).map { + MatrixFactorizationModel.recommendForAll(rank, userFeatures, productFeatures, num, blockSize) + .map { case (user, top) => val ratings = top.map { case (product, rating) => Rating(user, product, rating) } (user, ratings) @@ -233,12 +237,20 @@ class MatrixFactorizationModel @Since("0.8.0") ( */ @Since("1.4.0") def recommendUsersForProducts(num: Int): RDD[(Int, Array[Rating])] = { - MatrixFactorizationModel.recommendForAll(rank, productFeatures, userFeatures, num).map { + MatrixFactorizationModel.recommendForAll(rank, productFeatures, userFeatures, num, blockSize) + .map { case (product, top) => val ratings = top.map { case (user, rating) => Rating(user, product, rating) } (product, ratings) } } + + /** Sets blockSize, which will be used for recommendForAll. */ + @Since("2.3.0") + def setBlockSize(blockSize: Int): this.type = { + this.blockSize = blockSize + this + } } @Since("1.3.0") @@ -278,6 +290,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { * @param srcFeatures src features to receive recommendations * @param dstFeatures dst features used to make recommendations * @param num number of recommendations for each record + * @param blockSize number of records for each block * @return an RDD of (srcId: Int, recommendations), where recommendations are stored as an array * of (dstId, rating) pairs. */ @@ -285,7 +298,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { rank: Int, srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], - num: Int): RDD[(Int, Array[(Int, Double)])] = { + num: Int, + blockSize: Int = 4096): RDD[(Int, Array[(Int, Double)])] = { val srcBlocks = blockify(srcFeatures) val dstBlocks = blockify(dstFeatures) val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => @@ -313,7 +327,6 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { /** * Blockifies features to improve the efficiency of cartesian product - * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( features: RDD[(Int, Array[Double])], diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 9d31e792633c..faf5153ad7be 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -684,7 +684,7 @@ class ALSSuite Seq(2, 4, 6).foreach { k => val n = math.min(k, numItems).toInt val expectedUpToN = expected.mapValues(_.slice(0, n)) - val topItems = model.recommendForAllUsers(k) + val topItems = model.setBlockSize(2048).recommendForAllUsers(k) assert(topItems.count() == numUsers) assert(topItems.columns.contains("user")) checkRecommendations(topItems, expectedUpToN, "item") @@ -705,7 +705,7 @@ class ALSSuite Seq(2, 3, 4).foreach { k => val n = math.min(k, numUsers).toInt val expectedUpToN = expected.mapValues(_.slice(0, n)) - val topUsers = getALSModel.recommendForAllItems(k) + val topUsers = getALSModel.setBlockSize(2048).recommendForAllItems(k) assert(topUsers.count() == numItems) assert(topUsers.columns.contains("item")) checkRecommendations(topUsers, expectedUpToN, "user") diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala index 2c8ed057a516..9bce5c45044a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala @@ -75,7 +75,7 @@ class MatrixFactorizationModelSuite extends SparkFunSuite with MLlibTestSparkCon test("batch predict API recommendProductsForUsers") { val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures) val topK = 10 - val recommendations = model.recommendProductsForUsers(topK).collectAsMap() + val recommendations = model.setBlockSize(2048).recommendProductsForUsers(topK).collectAsMap() assert(recommendations(0)(0).rating ~== 17.0 relTol 1e-14) assert(recommendations(1)(0).rating ~== 39.0 relTol 1e-14) @@ -84,7 +84,7 @@ class MatrixFactorizationModelSuite extends SparkFunSuite with MLlibTestSparkCon test("batch predict API recommendUsersForProducts") { val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures) val topK = 10 - val recommendations = model.recommendUsersForProducts(topK).collectAsMap() + val recommendations = model.setBlockSize(2048).recommendUsersForProducts(topK).collectAsMap() assert(recommendations(2)(0).user == 1) assert(recommendations(2)(0).rating ~== 39.0 relTol 1e-14) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 3cc089dcede3..7e85d40b1c49 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -1008,6 +1008,10 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.RandomForestClassificationModel.setFeatureSubsetStrategy"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.numTrees"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.RandomForestRegressionModel.setFeatureSubsetStrategy") + ) ++ Seq( + // [SPARK-20443] The blockSize of MLLIB ALS should be setting by the User + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.recommendation.ALSModel.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.recommendation.MatrixFactorizationModel.this") ) }