From 4fd11b9eb5c21939adf1d104ea2d80de25af1542 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 3 May 2017 11:36:28 +0200 Subject: [PATCH 1/4] First cut --- .../apache/spark/ml/recommendation/ALS.scala | 69 +++++++++++++++++-- 1 file changed, 63 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index a20ef7244666..f5c42db05f11 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.Utils +import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.{OpenHashMap, OpenHashSet, SortDataFormat, Sorter} import org.apache.spark.util.random.XORShiftRandom @@ -356,6 +356,22 @@ class ALSModel private[ml] ( /** * Makes recommendations for all users (or items). + * + * Note: the previous approach used for computing top-k recommendations + * used a cross-join followed by predicting a score for each row of the joined dataset. + * However, this results in exploding the size of intermediate data. While Spark SQL makes it + * relatively efficient, the approach implemented here is significantly more efficient. + * + * This approach groups factors into blocks and computes the top-k elements per block, + * using Level 1 BLAS (dot) and an efficient BoundedPriorityQueue. It then computes the + * global top-k by aggregating the per block top-k elements with a [[TopByKeyAggregator]]. + * This significantly reduces the size of intermediate and shuffle data. + * This is the DataFrame equivalent to the approach used in + * [[org.apache.spark.mllib.recommendation.MatrixFactorizationModel]]. + * + * Compared with BLAS.dot, the hand-written version used below is more efficient than a call + * to the native BLAS backend and the same performance as the fallback F2jBLAS backend. + * * @param srcFactors src factors for which to generate recommendations * @param dstFactors dst factors used to make recommendations * @param srcOutputColumn name of the column for the source ID in the output DataFrame @@ -372,11 +388,41 @@ class ALSModel private[ml] ( num: Int): DataFrame = { import srcFactors.sparkSession.implicits._ - val ratings = srcFactors.crossJoin(dstFactors) - .select( - srcFactors("id"), - dstFactors("id"), - predict(srcFactors("features"), dstFactors("features"))) + val srcFactorsBlocked = blockify(srcFactors.as[(Int, Array[Float])]) + val dstFactorsBlocked = blockify(dstFactors.as[(Int, Array[Float])]) + val ratings = srcFactorsBlocked.crossJoin(dstFactorsBlocked) + .as[(Seq[(Int, Array[Float])], Seq[(Int, Array[Float])])] + .flatMap { case (srcIter, dstIter) => + val m = srcIter.size + val n = math.min(dstIter.size, num) + val output = new Array[(Int, Int, Float)](m * n) + var j = 0 + srcIter.foreach { case (srcId, srcFactor) => + val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2)) + dstIter.foreach { case (dstId, dstFactor) => + /** + * The below code is equivalent to + * val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1) + */ + var score = 0.0f + var k = 0 + while (k < rank) { + score += srcFactor(k) * dstFactor(k) + k += 1 + } + pq += { (dstId, score) } + } + val pqIter = pq.iterator + var i = 0 + while (i < n) { + val (dstId, score) = pqIter.next() + output(j + i) = (srcId, dstId, score) + i +=1 + } + j +=1 + } + output.toSeq + } // We'll force the IDs to be Int. Unfortunately this converts IDs to Int in the output. val topKAggregator = new TopByKeyAggregator[Int, Int, Float](num, Ordering.by(_._2)) val recs = ratings.as[(Int, Int, Float)].groupByKey(_._1).agg(topKAggregator.toColumn) @@ -389,6 +435,17 @@ class ALSModel private[ml] ( ) recs.select($"id" as srcOutputColumn, $"recommendations" cast arrayType) } + + /** + * Blockifies factors to improve the efficiency of cross join + */ + private def blockify( + factors: Dataset[(Int, Array[Float])], + /* TODO make blockSize a param? */blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = { + import factors.sparkSession.implicits._ + factors.mapPartitions(_.grouped(blockSize)) + } + } @Since("1.6.0") From 29d67774c88dd5c86ff8c5669b0328faa842239a Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 3 May 2017 12:17:00 +0200 Subject: [PATCH 2/4] Fix increment error --- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index f5c42db05f11..2daaebbfc8c9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -417,9 +417,9 @@ class ALSModel private[ml] ( while (i < n) { val (dstId, score) = pqIter.next() output(j + i) = (srcId, dstId, score) - i +=1 + i += 1 } - j +=1 + j += n } output.toSeq } From baeadd0403434a12d3868c75c1496b562f560d59 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 3 May 2017 21:05:25 +0200 Subject: [PATCH 3/4] Move PQ outside of foreach and update comments --- .../org/apache/spark/ml/recommendation/ALS.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 2daaebbfc8c9..8bbf90e172a7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -363,15 +363,12 @@ class ALSModel private[ml] ( * relatively efficient, the approach implemented here is significantly more efficient. * * This approach groups factors into blocks and computes the top-k elements per block, - * using Level 1 BLAS (dot) and an efficient BoundedPriorityQueue. It then computes the + * using Level 1 BLAS (dot) and an efficient [[BoundedPriorityQueue]]. It then computes the * global top-k by aggregating the per block top-k elements with a [[TopByKeyAggregator]]. * This significantly reduces the size of intermediate and shuffle data. * This is the DataFrame equivalent to the approach used in * [[org.apache.spark.mllib.recommendation.MatrixFactorizationModel]]. * - * Compared with BLAS.dot, the hand-written version used below is more efficient than a call - * to the native BLAS backend and the same performance as the fallback F2jBLAS backend. - * * @param srcFactors src factors for which to generate recommendations * @param dstFactors dst factors used to make recommendations * @param srcOutputColumn name of the column for the source ID in the output DataFrame @@ -397,12 +394,15 @@ class ALSModel private[ml] ( val n = math.min(dstIter.size, num) val output = new Array[(Int, Int, Float)](m * n) var j = 0 + val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2)) srcIter.foreach { case (srcId, srcFactor) => - val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2)) dstIter.foreach { case (dstId, dstFactor) => /** * The below code is equivalent to * val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1) + * Compared with BLAS.dot, the hand-written version used below is more efficient than + * a call to the native BLAS backend and the same performance as the fallback + * F2jBLAS backend. */ var score = 0.0f var k = 0 @@ -420,6 +420,7 @@ class ALSModel private[ml] ( i += 1 } j += n + pq.clear() } output.toSeq } From cf35eead9ce0d4832a0df5de22722b47e4017bf7 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Thu, 4 May 2017 09:22:16 +0200 Subject: [PATCH 4/4] Address review style comments --- .../apache/spark/ml/recommendation/ALS.scala | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 8bbf90e172a7..4a130e1089a8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -363,9 +363,9 @@ class ALSModel private[ml] ( * relatively efficient, the approach implemented here is significantly more efficient. * * This approach groups factors into blocks and computes the top-k elements per block, - * using Level 1 BLAS (dot) and an efficient [[BoundedPriorityQueue]]. It then computes the - * global top-k by aggregating the per block top-k elements with a [[TopByKeyAggregator]]. - * This significantly reduces the size of intermediate and shuffle data. + * using a simple dot product (instead of gemm) and an efficient [[BoundedPriorityQueue]]. + * It then computes the global top-k by aggregating the per block top-k elements with + * a [[TopByKeyAggregator]]. This significantly reduces the size of intermediate and shuffle data. * This is the DataFrame equivalent to the approach used in * [[org.apache.spark.mllib.recommendation.MatrixFactorizationModel]]. * @@ -397,12 +397,10 @@ class ALSModel private[ml] ( val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2)) srcIter.foreach { case (srcId, srcFactor) => dstIter.foreach { case (dstId, dstFactor) => - /** + /* * The below code is equivalent to - * val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1) - * Compared with BLAS.dot, the hand-written version used below is more efficient than - * a call to the native BLAS backend and the same performance as the fallback - * F2jBLAS backend. + * `val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)` + * This handwritten version is as or more efficient as BLAS calls in this case. */ var score = 0.0f var k = 0 @@ -410,7 +408,7 @@ class ALSModel private[ml] ( score += srcFactor(k) * dstFactor(k) k += 1 } - pq += { (dstId, score) } + pq += dstId -> score } val pqIter = pq.iterator var i = 0 @@ -434,15 +432,16 @@ class ALSModel private[ml] ( .add(dstOutputColumn, IntegerType) .add("rating", FloatType) ) - recs.select($"id" as srcOutputColumn, $"recommendations" cast arrayType) + recs.select($"id".as(srcOutputColumn), $"recommendations".cast(arrayType)) } /** * Blockifies factors to improve the efficiency of cross join + * TODO: SPARK-20443 - expose blockSize as a param? */ private def blockify( - factors: Dataset[(Int, Array[Float])], - /* TODO make blockSize a param? */blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = { + factors: Dataset[(Int, Array[Float])], + blockSize: Int = 4096): Dataset[Seq[(Int, Array[Float])]] = { import factors.sparkSession.implicits._ factors.mapPartitions(_.grouped(blockSize)) }