Skip to content
Closed
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.BoundedPriorityQueue

/**
* Model representing the result of matrix factorization.
Expand Down Expand Up @@ -276,44 +277,53 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
}
output.toSeq
/**
* Use dot to replace blas 3 gemm is the key approach to improve efficiency.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should say something like:

The previous approach used for computing top-k recommendations aimed to group individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could be used for efficiency. However, this causes excessive GC pressure due to the large arrays required for intermediate result storage, as well as a high sensitivity to the block size used.

The following approach still groups factors into blocks, but instead computes the top-k elements per block, using Level 1 BLAS (dot) and an efficient BoundedPriorityQueue. This avoids any large intermediate datastructures and results in significantly reduced GC pressure as well as shuffle data, which far outweighs any cost incurred from not using Level 3 BLAS operations.

* By this change, we can get the topK elements of each block to reduce the GC time.
* Comparing with BLAS.dot, hand-written dot is high efficiency.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Compared with BLAS.dot, the hand-written version below is more efficient than a call to the native BLAS backend and the same performance as the fallback F2jBLAS backend.

*/
val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to more detail to the doc string comment for this method to explain the approach used for efficiency.

val m = srcIter.size
val n = math.min(dstIter.size, num)
val output = new Array[(Int, (Int, Double))](m * n)
var j = 0
srcIter.foreach { case (srcId, srcFactor) =>
val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: there are several 4-space indents here that should be 2

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @srowen

dstIter.foreach { case (dstId, dstFactor) =>
/**
* The below code is equivalent to
* val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)
*/
var score: Double = 0
var k = 0
while (k < rank) {
score += srcFactor(k) * dstFactor(k)
k += 1
}
pq += ((dstId, score))
}
val pqIter = pq.iterator
var i = 0
while (i < n) {
output(j + i) = (srcId, pqIter.next())
i += 1
}
j += n
}
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}

/**
* Blockifies features to use Level-3 BLAS.
* Blockifies features to improve the efficiency of cartesian product
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should adjust the comment here as we're not using Level-3 BLAS any more.

private def blockify(
rank: Int,
features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
features: RDD[(Int, Array[Double])]): RDD[Seq[(Int, Array[Double])]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove rank argument here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's match what I've done in https://github.com/apache/spark/pull/17845/files#diff-be65dd1d6adc53138156641b610fcadaR440 - i.e. blockSize as an argument, with a TODO: SPARK-20443 - expose blockSize as a param?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, it seems to me that the performance can be less sensitive to blockSize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, less sensitive. See https://issues.apache.org/jira/browse/SPARK-20443. It may be that we make the block size tunable - or by experiments set a block size that seems generally optimal (2048 in those experiments seems best).

But we would need to perform experiments over a wide range of data sizes (and check both recommendForAllUsers and recommendForAllItems performance).

val blockSize = 4096 // TODO: tune the block size
val blockStorage = rank * blockSize
features.mapPartitions { iter =>
iter.grouped(blockSize).map { grouped =>
val ids = mutable.ArrayBuilder.make[Int]
ids.sizeHint(blockSize)
val factors = mutable.ArrayBuilder.make[Double]
factors.sizeHint(blockStorage)
var i = 0
grouped.foreach { case (id, factor) =>
ids += id
factors ++= factor
i += 1
}
(ids.result(), new DenseMatrix(rank, i, factors.result()))
}
iter.grouped(blockSize)
}
}

Expand Down