From 4661ddbbe02265333f03becc1a0cd10b29fd4109 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Thu, 27 Apr 2017 18:26:51 -0700 Subject: [PATCH 01/13] Add documentation for the `InBlock` class --- .../apache/spark/ml/recommendation/ALS.scala | 116 ++++++++++++++++-- 1 file changed, 106 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index a20ef7244666..3fe01400f960 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -917,19 +917,115 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { private type OutBlock = Array[Array[Int]] /** - * In-link block for computing src (user/item) factors. This includes the original src IDs - * of the elements within this block as well as encoded dst (item/user) indices and corresponding - * ratings. The dst indices are in the form of (blockId, localIndex), which are not the original - * dst IDs. To compute src factors, we expect receiving dst factors that match the dst indices. - * For example, if we have an in-link record + * In-link block for computing user and item factor matrices. * - * {srcId: 0, dstBlockId: 2, dstLocalIndex: 3, rating: 5.0}, + * The ALS algorithm partitions the columns of the users factor matrix evenly among Spark workers. + * Since each column of the factor matrix is calculated using the known ratings of the correspond- + * ing user, and since the ratings don't change across iterations, the ALS algorithm preshuffles + * the ratings to the appropriate partitions, storing them in `InBlock` objects. * - * and assume that the dst factors are stored as dstFactors: Map[Int, Array[Array[Float]]], which - * is a blockId to dst factors map, the corresponding dst factor of the record is dstFactor(2)(3). + * The ratings shuffled by item ID are computed similarly and also stored in `InBlock` objects. + * Note that this means every rating is stored twice, once as shuffled by user ID and once by item + * ID. This is a necessary tradeoff, since in general a rating will not be on the same worker + * when partitioned by user as by item. * - * We use a CSC-like (compressed sparse column) format to store the in-link information. So we can - * compute src factors one after another using only one normal equation instance. + * =Example= + * + * Say we have a small collection of eight items to offer the seven users in our application. We + * have some known ratings given by the users, as seen in the matrix below: + * + * {{{ + * Items + * 0 1 2 3 4 5 6 7 + * +---+---+---+---+---+---+---+---+ + * 0 | |0.1| | |0.4| | |0.7| + * +---+---+---+---+---+---+---+---+ + * 1 | | | | | | | | | + * +---+---+---+---+---+---+---+---+ + * U 2 | | | | | | | | | + * s +---+---+---+---+---+---+---+---+ + * e 3 | |3.1| | |3.4| | |3.7| + * r +---+---+---+---+---+---+---+---+ + * s 4 | | | | | | | | | + * +---+---+---+---+---+---+---+---+ + * 5 | | | | | | | | | + * +---+---+---+---+---+---+---+---+ + * 6 | |6.1| | |6.4| | |6.7| + * +---+---+---+---+---+---+---+---+ + * }}} + * + * The ratings are represented as an RDD, passed to the `partitionRatings` method as the `ratings` + * parameter: + * + * {{{ + * ratings.collect() == Seq( + * Rating(0, 1, 0.1f), + * Rating(0, 4, 0.4f), + * Rating(0, 7, 0.7f), + * Rating(3, 1, 3.1f), + * Rating(3, 4, 3.4f), + * Rating(3, 7, 3.7f), + * Rating(6, 1, 6.1f), + * Rating(6, 4, 6.4f), + * Rating(6, 7, 6.7f) + * ) + * }}} + * + * (In this contrived example, the rating values are chosen specifically for clarity and are in + * the form ''x''.''y'', where ''x'' is the user ID and ''y'' is the item ID. Note that in a real + * use case, the ratings given by users would more likely be whole numbers.) + * + * Say that we are using two partitions to calculate each factor matrix: + * + * {{{ + * val userPart = new ALSPartitioner(2) + * val itemPart = new ALSPartitioner(2) + * val blockRatings = partitionRatings(ratings, userPart, itemPart) + * }}} + * + * Ratings with even-valued user IDs are shuffled to partition 0 while those with odd-valued user + * IDs are shuffled to partition 1: + * + * {{{ + * userInBlocks.collect() == Seq( + * 0 -> Seq( + * // Internally, the class stores the ratings in a more optimized format than + * // a sequence of `Rating`s, but for clarity we show it as such here. + * Rating(0, 1, 0.1f), + * Rating(0, 4, 0.4f), + * Rating(0, 7, 0.7f), + * Rating(6, 1, 6.1f), + * Rating(6, 4, 6.4f), + * Rating(6, 7, 6.7f) + * ), + * 1 -> Seq( + * Rating(3, 1, 3.1f), + * Rating(3, 4, 3.4f), + * Rating(3, 7, 3.7f) + * ) + * ) + * }}} + * + * Similarly, ratings with even-valued item IDs are shuffled to partition 0 while those with + * odd-valued item IDs are shuffled to partition 1: + * + * {{{ + * itemInBlocks.collect() == Seq( + * 0 -> Seq( + * Rating(0, 4, 0.4f), + * Rating(3, 4, 3.4f), + * Rating(6, 4, 6.4f) + * ), + * 1 -> Seq( + * Rating(0, 1, 0.1f), + * Rating(0, 7, 0.7f), + * Rating(3, 1, 3.1f), + * Rating(3, 7, 3.7f), + * Rating(6, 1, 6.1f), + * Rating(6, 7, 6.7f) + * ) + * ) + * }}} * * @param srcIds src ids (ordered) * @param dstPtrs dst pointers. Elements in range [dstPtrs(i), dstPtrs(i+1)) of dst indices and From 7d1491e27dc7d6dbe634f4274584dc1fe9a8ecae Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Thu, 27 Apr 2017 18:41:05 -0700 Subject: [PATCH 02/13] Add documentation for the `OutBlock` data type --- .../org/apache/spark/ml/recommendation/ALS.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 3fe01400f960..c346c6d13cbe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -910,9 +910,14 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { private type FactorBlock = Array[Array[Float]] /** - * Out-link block that stores, for each dst (item/user) block, which src (user/item) factors to - * send. For example, outLinkBlock(0) contains the local indices (not the original src IDs) of the - * src factors in this block to send to dst block 0. + * Out-link blocks that store information about which columns of the items factor matrix are + * required to calculate which rows of the users factor matrix, and vice versa. + * + * Specifically, when calculating a user factor vector, since only those columns of the items + * factor matrix that correspond to the items that that user has rated are needed, we can avoid + * having to repeatedly copy the entire items factor matrix to each worker later in the algorithm + * by precomputing these dependencies for all users, storing them in an RDD of `OutBlock`s. The + * items' dependencies on the columns of the users factor matrix is computed similarly. */ private type OutBlock = Array[Array[Int]] From 2fdbcaa70f7d487cff4885ed87e7ee609aa6b24b Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Thu, 27 Apr 2017 18:43:37 -0700 Subject: [PATCH 03/13] Add documentation for `partitionRatings` method --- .../apache/spark/ml/recommendation/ALS.scala | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index c346c6d13cbe..2c975902c8fd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -1127,7 +1127,24 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { } /** - * Partitions raw ratings into blocks. + * Groups an RDD of `Rating`s by the user partition and item partition to which each `Rating` maps + * according to the given partitioners. The returned pair RDD holds the ratings, encoded in a + * memory-efficient format but otherwise unchanged, keyed by the (user partition ID, item + * partition ID) pair. + * + * Performance note: This is an expensive operation that performs an RDD shuffle. + * + * Implementation note: This implementation produces the same result as the following but + * generates fewer intermediate objects: + * + * {{{ + * ratings.map { r => + * ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r) + * }.aggregateByKey(new RatingBlockBuilder)( + * seqOp = (b, r) => b.add(r), + * combOp = (b0, b1) => b0.merge(b1.build())) + * .mapValues(_.build()) + * }}} * * @param ratings raw ratings * @param srcPart partitioner for src IDs @@ -1138,17 +1155,6 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { ratings: RDD[Rating[ID]], srcPart: Partitioner, dstPart: Partitioner): RDD[((Int, Int), RatingBlock[ID])] = { - - /* The implementation produces the same result as the following but generates less objects. - - ratings.map { r => - ((srcPart.getPartition(r.user), dstPart.getPartition(r.item)), r) - }.aggregateByKey(new RatingBlockBuilder)( - seqOp = (b, r) => b.add(r), - combOp = (b0, b1) => b0.merge(b1.build())) - .mapValues(_.build()) - */ - val numPartitions = srcPart.numPartitions * dstPart.numPartitions ratings.mapPartitions { iter => val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID]) From fb8f16df6c5b744a9312226493899ed09bf8d1ce Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Thu, 27 Apr 2017 18:45:51 -0700 Subject: [PATCH 04/13] Add documentation for `ALS.train` method --- .../apache/spark/ml/recommendation/ALS.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 2c975902c8fd..9a15828e53b0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -774,6 +774,28 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { /** * :: DeveloperApi :: * Implementation of the ALS algorithm. + * + * This implementation of the ALS factorization algorithm partitions the two sets of factors among + * Spark workers so as to reduce network communication by only sending one copy of each factor + * vector to each Spark worker on each iteration, and only if needed. This is achieved by + * precomputing some information about the ratings matrix to determine which users require which + * item factors and vice versa. See the Scaladoc for [[InBlock]] for a detailed explanation of + * how the precomputation is done. + * + * In addition, since each iteration of calculating the factor matrices depends on the known + * ratings, which are spread across Spark partitions, a naive implementation would incur + * significant network communication overhead between Spark workers, as the ratings RDD would be + * repeatedly shuffled during each iteration. This implementation reduces that overhead by + * performing the shuffling operation up front, precomputing each partition's ratings dependencies + * and duplicating those values to the appropriate workers before starting iterations to solve for + * the factor matrices. See the Scaladoc for [[OutBlock]] for a detailed explanation of how the + * precomputation is done. + * + * Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by + * contiguous blocks from the ratings matrix but by a hash function on the rating's location in + * the matrix. If it helps you to visualize the partitions, it is easier to think of the term + * "block" as referring to a subset of an RDD containing the ratings rather than a contiguous + * submatrix of the ratings matrix. */ @DeveloperApi def train[ID: ClassTag]( // scalastyle:ignore From 0a2edf0a09bdbb1ff81f1cde9a8c60b15ce2b68f Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Thu, 27 Apr 2017 18:50:37 -0700 Subject: [PATCH 05/13] Add inline comments to `ALS.train` method --- .../apache/spark/ml/recommendation/ALS.scala | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 9a15828e53b0..bfebd5b5961d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -813,32 +813,43 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { checkpointInterval: Int = 10, seed: Long = 0L)( implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = { + require(!ratings.isEmpty(), s"No ratings available from $ratings") require(intermediateRDDStorageLevel != StorageLevel.NONE, "ALS is not designed to run without persisting intermediate RDDs.") + val sc = ratings.sparkContext + + // Precompute the rating dependencies of each partition val userPart = new ALSPartitioner(numUserBlocks) val itemPart = new ALSPartitioner(numItemBlocks) - val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) - val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions) - val solver = if (nonnegative) new NNLSSolver else new CholeskySolver val blockRatings = partitionRatings(ratings, userPart, itemPart) .persist(intermediateRDDStorageLevel) val (userInBlocks, userOutBlocks) = makeBlocks("user", blockRatings, userPart, itemPart, intermediateRDDStorageLevel) - // materialize blockRatings and user blocks - userOutBlocks.count() + userOutBlocks.count() // materialize blockRatings and user blocks val swappedBlockRatings = blockRatings.map { case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) => ((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings)) } val (itemInBlocks, itemOutBlocks) = makeBlocks("item", swappedBlockRatings, itemPart, userPart, intermediateRDDStorageLevel) - // materialize item blocks - itemOutBlocks.count() + itemOutBlocks.count() // materialize item blocks + + // Encoders for storing each user/item's partition ID and index within its partition using a + // single integer; used as an optimization + val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions) + val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions) + + // These are the user and item factor matrices that, once trained, are multiplied together to + // estimate the rating matrix. The two matrices are stored in RDDs, partitioned by column such + // that each factor column resides on the same Spark worker as its corresponding user or item. val seedGen = new XORShiftRandom(seed) var userFactors = initialize(userInBlocks, rank, seedGen.nextLong()) var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong()) + + val solver = if (nonnegative) new NNLSSolver else new CholeskySolver + var previousCheckpointFile: Option[String] = None val shouldCheckpoint: Int => Boolean = (iter) => sc.checkpointDir.isDefined && checkpointInterval != -1 && (iter % checkpointInterval == 0) @@ -852,6 +863,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { logWarning(s"Cannot delete checkpoint file $file:", e) } } + if (implicitPrefs) { for (iter <- 1 to maxIter) { userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel) From 6da60f0700fddea187ec9bdee1f0c020c09ea569 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Fri, 28 Apr 2017 17:02:57 -0700 Subject: [PATCH 06/13] Clarify that IDs are mapped to partitions by modulus --- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index bfebd5b5961d..0dea294751a6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -1022,8 +1022,9 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { * val blockRatings = partitionRatings(ratings, userPart, itemPart) * }}} * - * Ratings with even-valued user IDs are shuffled to partition 0 while those with odd-valued user - * IDs are shuffled to partition 1: + * Ratings are mapped to partitions using the user/item IDs modulo the number of partitions. With + * two partitions, ratings with even-valued user IDs are shuffled to partition 0 while those with + * odd-valued user IDs are shuffled to partition 1: * * {{{ * userInBlocks.collect() == Seq( From 57de83bb10c08b942e143cee0d0da42957f153ab Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Fri, 28 Apr 2017 17:05:46 -0700 Subject: [PATCH 07/13] Add link to `Rating` in Scaladoc for `partitionRatings` --- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 0dea294751a6..0a482e3b8ff6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -1162,9 +1162,9 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { } /** - * Groups an RDD of `Rating`s by the user partition and item partition to which each `Rating` maps - * according to the given partitioners. The returned pair RDD holds the ratings, encoded in a - * memory-efficient format but otherwise unchanged, keyed by the (user partition ID, item + * Groups an RDD of [[Rating]]s by the user partition and item partition to which each `Rating` + * maps according to the given partitioners. The returned pair RDD holds the ratings, encoded in + * a memory-efficient format but otherwise unchanged, keyed by the (user partition ID, item * partition ID) pair. * * Performance note: This is an expensive operation that performs an RDD shuffle. From 5a4eb8544cae8812ed7ef8cb3d4d47eb38423277 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Fri, 28 Apr 2017 18:22:25 -0700 Subject: [PATCH 08/13] Fix Scaladoc errors I don't believe Scaladoc can link to nested classes --- .../scala/org/apache/spark/ml/recommendation/ALS.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 0a482e3b8ff6..7c2a21419474 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -779,8 +779,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { * Spark workers so as to reduce network communication by only sending one copy of each factor * vector to each Spark worker on each iteration, and only if needed. This is achieved by * precomputing some information about the ratings matrix to determine which users require which - * item factors and vice versa. See the Scaladoc for [[InBlock]] for a detailed explanation of - * how the precomputation is done. + * item factors and vice versa. See the Scaladoc for `InBlock` for a detailed explanation of how + * the precomputation is done. * * In addition, since each iteration of calculating the factor matrices depends on the known * ratings, which are spread across Spark partitions, a naive implementation would incur @@ -788,7 +788,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { * repeatedly shuffled during each iteration. This implementation reduces that overhead by * performing the shuffling operation up front, precomputing each partition's ratings dependencies * and duplicating those values to the appropriate workers before starting iterations to solve for - * the factor matrices. See the Scaladoc for [[OutBlock]] for a detailed explanation of how the + * the factor matrices. See the Scaladoc for `OutBlock` for a detailed explanation of how the * precomputation is done. * * Note that the term "rating block" is a bit of a misnomer, as the ratings are not partitioned by @@ -1277,8 +1277,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { def length: Int = srcIds.length /** - * Compresses the block into an [[InBlock]]. The algorithm is the same as converting a - * sparse matrix from coordinate list (COO) format into compressed sparse column (CSC) format. + * Compresses the block into an `InBlock`. The algorithm is the same as converting a sparse + * matrix from coordinate list (COO) format into compressed sparse column (CSC) format. * Sorting is done using Spark's built-in Timsort to avoid generating too many objects. */ def compress(): InBlock[ID] = { From e5cdba112081da28dc6bfe628ee04a3a54205921 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Sun, 30 Apr 2017 02:11:41 -0700 Subject: [PATCH 09/13] Remove unnecessary paragraph in Scaladoc --- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 7c2a21419474..c849825afc88 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -1010,10 +1010,6 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { * ) * }}} * - * (In this contrived example, the rating values are chosen specifically for clarity and are in - * the form ''x''.''y'', where ''x'' is the user ID and ''y'' is the item ID. Note that in a real - * use case, the ratings given by users would more likely be whole numbers.) - * * Say that we are using two partitions to calculate each factor matrix: * * {{{ From c82501a6945256ba6b602bdfbb89ef4671d86970 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Wed, 3 May 2017 16:13:02 -0700 Subject: [PATCH 10/13] Clarify Scaladoc for `OutBlock` --- .../apache/spark/ml/recommendation/ALS.scala | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index c849825afc88..a5e12a8d49cb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -944,14 +944,33 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { private type FactorBlock = Array[Array[Float]] /** - * Out-link blocks that store information about which columns of the items factor matrix are - * required to calculate which rows of the users factor matrix, and vice versa. + * A mapping of the columns of the items factor matrix that are needed when calculating each row + * of the users factor matrix, and vice versa. * * Specifically, when calculating a user factor vector, since only those columns of the items * factor matrix that correspond to the items that that user has rated are needed, we can avoid * having to repeatedly copy the entire items factor matrix to each worker later in the algorithm * by precomputing these dependencies for all users, storing them in an RDD of `OutBlock`s. The * items' dependencies on the columns of the users factor matrix is computed similarly. + * + * =Example= + * + * Using the example provided in the `InBlock` Scaladoc, `userOutBlocks` would look like the + * following: + * + * {{{ userOutBlocks.collect() == Seq( + * 0 -> Array(Array(0, 1), Array(0, 1)), + * 1 -> Array(Array(0), Array(0))) }}} + * + * The data structure encodes the following information: + * + * * There are ratings with user IDs 0 and 6 (encoded in `Array(0, 1)`, where 0 and 1 are the + * indices of the user IDs 0 and 6 on partition 0) whose item IDs map to partitions 0 and 1 + * (represented by the fact that `Array(0, 1)` appears in both the 0th and 1st positions). + * + * * There are ratings with user ID 3 (encoded in `Array(0)`, where 0 is the index of the user + * ID 3 on partition 1) whose item IDs map to partitions 0 and 1 (represented by the fact that + * `Array(0)` appears in both the 0th and 1st positions). */ private type OutBlock = Array[Array[Int]] From 983f9eb49fc9ca4f97822f4363eb5b04377e1a27 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Fri, 5 May 2017 23:34:09 -0700 Subject: [PATCH 11/13] Correct spacing in Scaladoc --- .../org/apache/spark/ml/recommendation/ALS.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index a5e12a8d49cb..78d435be863c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -958,19 +958,22 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { * Using the example provided in the `InBlock` Scaladoc, `userOutBlocks` would look like the * following: * - * {{{ userOutBlocks.collect() == Seq( + * {{{ + * userOutBlocks.collect() == Seq( * 0 -> Array(Array(0, 1), Array(0, 1)), - * 1 -> Array(Array(0), Array(0))) }}} + * 1 -> Array(Array(0), Array(0)) + * ) + * }}} * * The data structure encodes the following information: * * * There are ratings with user IDs 0 and 6 (encoded in `Array(0, 1)`, where 0 and 1 are the - * indices of the user IDs 0 and 6 on partition 0) whose item IDs map to partitions 0 and 1 - * (represented by the fact that `Array(0, 1)` appears in both the 0th and 1st positions). + * indices of the user IDs 0 and 6 on partition 0) whose item IDs map to partitions 0 and 1 + * (represented by the fact that `Array(0, 1)` appears in both the 0th and 1st positions). * * * There are ratings with user ID 3 (encoded in `Array(0)`, where 0 is the index of the user - * ID 3 on partition 1) whose item IDs map to partitions 0 and 1 (represented by the fact that - * `Array(0)` appears in both the 0th and 1st positions). + * ID 3 on partition 1) whose item IDs map to partitions 0 and 1 (represented by the fact + * that `Array(0)` appears in both the 0th and 1st positions). */ private type OutBlock = Array[Array[Int]] From 3d5d8a6f6c4db7a6639994a175ba584157eea10f Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Fri, 5 May 2017 23:55:50 -0700 Subject: [PATCH 12/13] Add more clarifying explanation about `OutBlock`s --- .../scala/org/apache/spark/ml/recommendation/ALS.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 78d435be863c..9ae849ce9099 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -965,7 +965,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { * ) * }}} * - * The data structure encodes the following information: + * Each value in this map-like sequence is of type `Array[Array[Int]]`. The values in the + * inner array are the ranks of the sorted user IDs in that partition; so in the example above, + * `Array(0, 1)` in partition 0 refers to user IDs 0 and 6, since when all unique user IDs in + * partition 0 are sorted, 0 is the first ID and 6 is the second. The position of each inner + * array in its enclosing outer array denotes the partition number to which item IDs map; in the + * example, the first `Array(0, 1)` is in position 0 of its outer array, denoting item IDs that + * map to partition 0. + * + * In summary, the data structure encodes the following information: * * * There are ratings with user IDs 0 and 6 (encoded in `Array(0, 1)`, where 0 and 1 are the * indices of the user IDs 0 and 6 on partition 0) whose item IDs map to partitions 0 and 1 From 6d27fffee8b3deafca8445cfa7afccc2ed49b1ad Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Sat, 6 May 2017 00:07:29 -0700 Subject: [PATCH 13/13] Fix failing doc build --- .../scala/org/apache/spark/ml/recommendation/ALS.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 9ae849ce9099..1562bf1beb7e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -976,12 +976,12 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { * In summary, the data structure encodes the following information: * * * There are ratings with user IDs 0 and 6 (encoded in `Array(0, 1)`, where 0 and 1 are the - * indices of the user IDs 0 and 6 on partition 0) whose item IDs map to partitions 0 and 1 - * (represented by the fact that `Array(0, 1)` appears in both the 0th and 1st positions). + * indices of the user IDs 0 and 6 on partition 0) whose item IDs map to partitions 0 and 1 + * (represented by the fact that `Array(0, 1)` appears in both the 0th and 1st positions). * * * There are ratings with user ID 3 (encoded in `Array(0)`, where 0 is the index of the user - * ID 3 on partition 1) whose item IDs map to partitions 0 and 1 (represented by the fact - * that `Array(0)` appears in both the 0th and 1st positions). + * ID 3 on partition 1) whose item IDs map to partitions 0 and 1 (represented by the fact that + * `Array(0)` appears in both the 0th and 1st positions). */ private type OutBlock = Array[Array[Int]]