Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 9 additions & 19 deletions mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.util.{Sorting, Try}
import scala.util.hashing.byteswap64

import com.github.fommil.netlib.BLAS.{getInstance => blas}
import com.github.fommil.netlib.F2jBLAS
import org.apache.hadoop.fs.Path
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
Expand Down Expand Up @@ -363,7 +364,7 @@ class ALSModel private[ml] (
* relatively efficient, the approach implemented here is significantly more efficient.
*
* This approach groups factors into blocks and computes the top-k elements per block,
* using a simple dot product (instead of gemm) and an efficient [[BoundedPriorityQueue]].
* using dot product and an efficient [[BoundedPriorityQueue]] (instead of gemm).
* It then computes the global top-k by aggregating the per block top-k elements with
* a [[TopByKeyAggregator]]. This significantly reduces the size of intermediate and shuffle data.
* This is the DataFrame equivalent to the approach used in
Expand Down Expand Up @@ -393,31 +394,18 @@ class ALSModel private[ml] (
val m = srcIter.size
val n = math.min(dstIter.size, num)
val output = new Array[(Int, Int, Float)](m * n)
var j = 0
var i = 0
val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2))
srcIter.foreach { case (srcId, srcFactor) =>
dstIter.foreach { case (dstId, dstFactor) =>
/*
* The below code is equivalent to
* `val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)`
* This handwritten version is as or more efficient as BLAS calls in this case.
*/
var score = 0.0f
var k = 0
while (k < rank) {
score += srcFactor(k) * dstFactor(k)
k += 1
}
// We use F2jBLAS which is faster than a call to native BLAS for vector dot product
val score = ALSModel._f2jBLAS.sdot(rank, srcFactor, 1, dstFactor, 1)
pq += dstId -> score
}
val pqIter = pq.iterator
var i = 0
while (i < n) {
val (dstId, score) = pqIter.next()
output(j + i) = (srcId, dstId, score)
pq.foreach { case (dstId, score) =>
output(i) = (srcId, dstId, score)
i += 1
}
j += n
pq.clear()
}
output.toSeq
Expand Down Expand Up @@ -451,6 +439,8 @@ class ALSModel private[ml] (
@Since("1.6.0")
object ALSModel extends MLReadable[ALSModel] {

@transient private[recommendation] val _f2jBLAS = new F2jBLAS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this require significant initialization? You could use org.apache.spark.ml.linalg.BLAS.f2jBLAS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more or less than using ml.linalg.BLAS - I did think of that but the var needs to be exposed as private[ml]. If we're ok with that then it'll be slightly cleaner to use that, yes.


private val NaN = "nan"
private val Drop = "drop"
private[recommendation] final val supportedColdStartStrategies = Array(NaN, Drop)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.spark.mllib.recommendation
import java.io.IOException
import java.lang.{Integer => JavaInteger}

import scala.collection.mutable

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import com.github.fommil.netlib.F2jBLAS
import org.apache.hadoop.fs.Path
import org.json4s._
import org.json4s.JsonDSL._
Expand All @@ -33,7 +32,6 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.internal.Logging
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -248,6 +246,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {

import org.apache.spark.mllib.util.Loader._

@transient private val _f2jBLAS = new F2jBLAS

/**
* Makes recommendations for a single user (or product).
*/
Expand All @@ -263,6 +263,19 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {

/**
* Makes recommendations for all users (or products).
*
* Note: the previous approach used for computing top-k recommendations aimed to group
* individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could
* be used for efficiency. However, this causes excessive GC pressure due to the large
* arrays required for intermediate result storage, as well as a high sensitivity to the
* block size used.
*
* The following approach still groups factors into blocks, but instead computes the
* top-k elements per block, using dot product and an efficient [[BoundedPriorityQueue]]
* (instead of gemm). This avoids any large intermediate data structures and results
* in significantly reduced GC pressure as well as shuffle data, which far outweighs
* any cost incurred from not using Level 3 BLAS operations.
*
* @param rank rank
* @param srcFeatures src features to receive recommendations
* @param dstFeatures dst features used to make recommendations
Expand All @@ -277,46 +290,22 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(srcFeatures)
val dstBlocks = blockify(dstFeatures)
/**
* The previous approach used for computing top-k recommendations aimed to group
* individual factor vectors into blocks, so that Level 3 BLAS operations (gemm) could
* be used for efficiency. However, this causes excessive GC pressure due to the large
* arrays required for intermediate result storage, as well as a high sensitivity to the
* block size used.
* The following approach still groups factors into blocks, but instead computes the
* top-k elements per block, using a simple dot product (instead of gemm) and an efficient
* [[BoundedPriorityQueue]]. This avoids any large intermediate data structures and results
* in significantly reduced GC pressure as well as shuffle data, which far outweighs
* any cost incurred from not using Level 3 BLAS operations.
*/
val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) =>
val m = srcIter.size
val n = math.min(dstIter.size, num)
val output = new Array[(Int, (Int, Double))](m * n)
var j = 0
var i = 0
val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
srcIter.foreach { case (srcId, srcFactor) =>
dstIter.foreach { case (dstId, dstFactor) =>
/*
* The below code is equivalent to
* `val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)`
* This handwritten version is as or more efficient as BLAS calls in this case.
*/
var score: Double = 0
var k = 0
while (k < rank) {
score += srcFactor(k) * dstFactor(k)
k += 1
}
// We use F2jBLAS which is faster than a call to native BLAS for vector dot product
val score = _f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
pq += dstId -> score
}
val pqIter = pq.iterator
var i = 0
while (i < n) {
output(j + i) = (srcId, pqIter.next())
pq.foreach { case (dstId, score) =>
output(i) = (srcId, (dstId, score))
i += 1
}
j += n
pq.clear()
}
output.toSeq
Expand Down