From 767d72e9bd08cc251aa1df0ce56a4fb8deb9e2e6 Mon Sep 17 00:00:00 2001 From: David Eis Date: Tue, 2 May 2017 21:46:29 +0000 Subject: [PATCH 1/2] Fix regression introduced by ccafd757eda478913f783f3127be715bf6413740 Revert the handling of negative values in ALS with implicit feedback and test for regression. --- .../apache/spark/ml/recommendation/ALS.scala | 24 +++++---- .../spark/ml/recommendation/ALSSuite.scala | 50 ++++++++++++++++++- 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 0955d3e6e1f8f..0604a7c2ff6be 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -763,11 +763,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { /** * Representing a normal equation to solve the following weighted least squares problem: * - * minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - b,,i,,)^2^ + lambda * x^T^ x. + * minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - d,,i,,)^2^ + lambda * x^T^ x. * * Its normal equation is given by * - * \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - b,,i,, a,,i,,) + lambda * x = 0. + * \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - d,,i,, a,,i,,) + lambda * x = 0. + * + * Distributing and letting b,,i,, = d,,i,, * b,,i,, + * + * \sum,,i,, c,,i,, a,,i,, a,,i,,^T^ x - b,,i,, a,,i,, + lambda * x = 0. */ private[recommendation] class NormalEquation(val k: Int) extends Serializable { @@ -795,8 +799,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { require(a.length == k) copyToDouble(a) blas.dspr(upper, k, c, da, 1, ata) - if (b != 0.0) { - blas.daxpy(k, c * b, da, 1, atb, 1) + if (Math.abs(b) > Double.MinPositiveValue) { + blas.daxpy(k, b, da, 1, atb, 1) } this } @@ -1624,15 +1628,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { val srcFactor = sortedSrcFactors(blockId)(localIndex) val rating = ratings(i) if (implicitPrefs) { - // Extension to the original paper to handle b < 0. confidence is a function of |b| - // instead so that it is never negative. c1 is confidence - 1.0. + // Extension to the original paper to handle rating < 0. confidence is a function + // of |rating| instead so that it is never negative. c1 is confidence - 1. val c1 = alpha * math.abs(rating) - // For rating <= 0, the corresponding preference is 0. So the term below is only added - // for rating > 0. Because YtY is already added, we need to adjust the scaling here. - if (rating > 0) { + // For rating <= 0, the corresponding preference is 0. So the second argument of add + // is only there for rating > 0. + if (rating > 0.0) { numExplicits += 1 - ls.add(srcFactor, (c1 + 1.0) / c1, c1) } + ls.add(srcFactor, if (rating > 0.0) 1.0 + c1 else 0.0, c1) } else { ls.add(srcFactor, rating) numExplicits += 1 diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 9d31e792633cd..701040f2d6041 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.ml.recommendation.ALS._ import org.apache.spark.ml.recommendation.ALS.Rating import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.recommendation.MatrixFactorizationModelSuite import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted} @@ -78,7 +79,7 @@ class ALSSuite val k = 2 val ne0 = new NormalEquation(k) .add(Array(1.0f, 2.0f), 3.0) - .add(Array(4.0f, 5.0f), 6.0, 2.0) // weighted + .add(Array(4.0f, 5.0f), 12.0, 2.0) // weighted assert(ne0.k === k) assert(ne0.triK === k * (k + 1) / 2) // NumPy code that computes the expected values: @@ -347,6 +348,37 @@ class ALSSuite ALSSuite.genFactors(size, rank, random, a, b) } + /** + * Train ALS using the given training set and parameters + * @param training training dataset + * @param rank rank of the matrix factorization + * @param maxIter max number of iterations + * @param regParam regularization constant + * @param implicitPrefs whether to use implicit preference + * @param numUserBlocks number of user blocks + * @param numItemBlocks number of item blocks + * @return a trained ALSModel + */ + def trainALS( + training: RDD[Rating[Int]], + rank: Int, + maxIter: Int, + regParam: Double, + implicitPrefs: Boolean = false, + numUserBlocks: Int = 2, + numItemBlocks: Int = 3): ALSModel = { + val spark = this.spark + import spark.implicits._ + val als = new ALS() + .setRank(rank) + .setRegParam(regParam) + .setImplicitPrefs(implicitPrefs) + .setNumUserBlocks(numUserBlocks) + .setNumItemBlocks(numItemBlocks) + .setSeed(0) + als.fit(training.toDF()) + } + /** * Test ALS using the given training/test splits and parameters. * @param training training dataset @@ -455,6 +487,22 @@ class ALSSuite targetRMSE = 0.3) } + test("implicit feedback regression") { + val trainingWithNeg = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 1), Rating(0, 1, -3))) + val trainingWithZero = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 1), Rating(0, 1, 0))) + val modelWithNeg = + trainALS(trainingWithNeg, rank = 1, maxIter = 5, regParam = 0.01, implicitPrefs = true) + val modelWithZero = + trainALS(trainingWithZero, rank = 1, maxIter = 5, regParam = 0.01, implicitPrefs = true) + val userFactorsNeg = modelWithNeg.userFactors + val itemFactorsNeg = modelWithNeg.itemFactors + val userFactorsZero = modelWithZero.userFactors + val itemFactorsZero = modelWithZero.itemFactors + userFactorsNeg.collect().foreach(arr => logInfo(s"implicit test " + arr.mkString(" "))) + userFactorsZero.collect().foreach(arr => logInfo(s"implicit test " + arr.mkString(" "))) + assert(userFactorsNeg.intersect(userFactorsZero).count() == 0) + assert(itemFactorsNeg.intersect(itemFactorsZero).count() == 0) + } test("using generic ID types") { val (ratings, _) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01) From 21c0fd9f2e77de4d1003fd9801028a947ce6d338 Mon Sep 17 00:00:00 2001 From: David Eis Date: Thu, 18 May 2017 15:32:54 +0000 Subject: [PATCH 2/2] Fixed comment and zero check --- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 0604a7c2ff6be..3d5fd1794de23 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -769,7 +769,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { * * \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - d,,i,, a,,i,,) + lambda * x = 0. * - * Distributing and letting b,,i,, = d,,i,, * b,,i,, + * Distributing and letting b,,i,, = c,,i,, * d,,i,, * * \sum,,i,, c,,i,, a,,i,, a,,i,,^T^ x - b,,i,, a,,i,, + lambda * x = 0. */ @@ -799,7 +799,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { require(a.length == k) copyToDouble(a) blas.dspr(upper, k, c, da, 1, ata) - if (Math.abs(b) > Double.MinPositiveValue) { + if (b != 0.0) { blas.daxpy(k, b, da, 1, atb, 1) } this