Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.mllib.recommendation
import java.io.IOException
import java.lang.{Integer => JavaInteger}

import scala.collection.mutable

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.hadoop.fs.Path
Expand All @@ -31,7 +33,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.internal.Logging
import org.apache.spark.mllib.linalg.BLAS
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -286,40 +288,119 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(srcFeatures)
val dstBlocks = blockify(dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) =>
val m = srcIter.size
val n = math.min(dstIter.size, num)
val output = new Array[(Int, (Int, Double))](m * n)
val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).map {
case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val dstIdMatrix = new Array[Int](m * num)
val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity)
val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2))

val ratings = srcFactors.transpose.multiply(dstFactors)
var i = 0
var j = 0
while (i < m) {
var k = 0
while (k < n) {
pq += dstIds(k) -> ratings(i, k)
k += 1
}
k = 0
pq.toArray.sortBy(-_._2).foreach { case (id, score) =>
dstIdMatrix(j + k) = id
scoreMatrix(j + k) = score
k += 1
}
// pq.size maybe less than num, corner case
j += num
i += 1
pq.clear()
}
(index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, true)))
}
ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)(
(rateSum, rate) => mergeFunc(rateSum, rate, num),
(rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num)
).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) =>
// to avoid corner case that the number of items is less than recommendation num
var col: Int = 0
while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) {
col += 1
}
val row = scoreMatrix.numRows
val output = new Array[(Int, Array[(Int, Double)])](row)
var i = 0
val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
srcIter.foreach { case (srcId, srcFactor) =>
dstIter.foreach { case (dstId, dstFactor) =>
// We use F2jBLAS which is faster than a call to native BLAS for vector dot product
val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
pq += dstId -> score
while (i < row) {
val factors = new Array[(Int, Double)](col)
var j = 0
while (j < col) {
factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j))
j += 1
}
pq.foreach { case (dstId, score) =>
output(i) = (srcId, (dstId, score))
i += 1
output(i) = (srcIds(i), factors)
i += 1
}
output.toSeq}
}

private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix),
rate: (Array[Int], Array[Int], DenseMatrix),
num: Int): (Array[Int], Array[Int], DenseMatrix) = {
if (rateSum._1 == null) {
rate
} else {
val row = rateSum._3.numRows
var i = 0
val tempIdMatrix = new Array[Int](row * num)
val tempScoreMatrix = Array.fill[Double](row * num)(Double.NegativeInfinity)
while (i < row) {
var j = 0
var sum_index = 0
var rate_index = 0
val matrixIndex = i * num
while (j < num) {
if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) {
tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + rate_index)
tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index)
rate_index += 1
} else {
tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + sum_index)
tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index)
sum_index += 1
}
j += 1
}
pq.clear()
i += 1
}
output.toSeq
(rateSum._1, tempIdMatrix, new DenseMatrix(row, num, tempScoreMatrix, true))
}
ratings.topByKey(num)(Ordering.by(_._2))
}

/**
* Blockifies features to improve the efficiency of cartesian product
* TODO: SPARK-20443 - expose blockSize as a param?
*/
private def blockify(
features: RDD[(Int, Array[Double])],
blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = {
def blockify(
rank: Int,
features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
val blockSize = 2000 // TODO: tune the block size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So will you add a parameter for this ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have another PR to set this value SPARK-20443.
If the blockSize is large enough, it is possible to OOM. For my test, the blockSize is set from 1000 to 8000, the performance of this PR is better than the master.
And the performance is about the same for blockSize is 1000 to 8000.

val blockStorage = rank * blockSize
features.mapPartitions { iter =>
iter.grouped(blockSize)
iter.grouped(blockSize).map { grouped =>
val ids = mutable.ArrayBuilder.make[Int]
ids.sizeHint(blockSize)
val factors = mutable.ArrayBuilder.make[Double]
factors.sizeHint(blockStorage)
var i = 0
grouped.foreach { case (id, factor) =>
ids += id
factors ++= factor
i += 1
}
(ids.result(), new DenseMatrix(rank, i, factors.result()))
}
}
}

Expand Down