|
18 | 18 | package org.apache.spark.mllib.linalg.distributed |
19 | 19 |
|
20 | 20 | import breeze.linalg.{DenseMatrix => BDM} |
21 | | -import org.apache.spark.util.Utils |
22 | 21 |
|
23 | 22 | import org.apache.spark.{Logging, Partitioner} |
24 | | -import org.apache.spark.mllib.linalg._ |
25 | | -import org.apache.spark.mllib.rdd.RDDFunctions._ |
| 23 | +import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix} |
26 | 24 | import org.apache.spark.rdd.RDD |
27 | 25 | import org.apache.spark.storage.StorageLevel |
28 | 26 |
|
29 | 27 | /** |
30 | | - * A grid partitioner, which stores every block in a separate partition. |
| 28 | + * A grid partitioner, which uses a regular grid to partition coordinates. |
31 | 29 | * |
32 | | - * @param numRowBlocks Number of blocks that form the rows of the matrix. |
33 | | - * @param numColBlocks Number of blocks that form the columns of the matrix. |
34 | | - * @param suggestedNumPartitions Number of partitions to partition the rdd into. The final number |
35 | | - * of partitions will be set to `min(suggestedNumPartitions, |
36 | | - * numRowBlocks * numColBlocks)`, because setting the number of |
37 | | - * partitions greater than the number of sub matrices is not useful. |
| 30 | + * @param rows Number of rows. |
| 31 | + * @param cols Number of columns. |
| 32 | + * @param rowsPerPart Number of rows per partition, which may be less at the bottom edge. |
| 33 | + * @param colsPerPart Number of columns per partition, which may be less at the right edge. |
38 | 34 | */ |
39 | 35 | private[mllib] class GridPartitioner( |
40 | | - val numRowBlocks: Int, |
41 | | - val numColBlocks: Int, |
42 | | - suggestedNumPartitions: Int) extends Partitioner { |
43 | | - private val totalBlocks = numRowBlocks.toLong * numColBlocks |
44 | | - // Having the number of partitions greater than the number of sub matrices does not help |
45 | | - override val numPartitions = math.min(suggestedNumPartitions, totalBlocks).toInt |
46 | | - |
47 | | - private val blockLengthsPerPartition = findOptimalBlockLengths |
48 | | - // Number of neighboring blocks to take in each row |
49 | | - private val numRowBlocksPerPartition = blockLengthsPerPartition._1 |
50 | | - // Number of neighboring blocks to take in each column |
51 | | - private val numColBlocksPerPartition = blockLengthsPerPartition._2 |
52 | | - // Number of rows of partitions |
53 | | - private val blocksPerRow = math.ceil(numRowBlocks * 1.0 / numRowBlocksPerPartition).toInt |
| 36 | + val rows: Int, |
| 37 | + val cols: Int, |
| 38 | + val rowsPerPart: Int, |
| 39 | + val colsPerPart: Int) extends Partitioner { |
| 40 | + |
| 41 | + require(rows > 0) |
| 42 | + require(cols > 0) |
| 43 | + require(rowsPerPart > 0) |
| 44 | + require(colsPerPart > 0) |
| 45 | + |
| 46 | + private val rowPartitions = math.ceil(rows / rowsPerPart).toInt |
| 47 | + private val colPartitions = math.ceil(cols / colsPerPart).toInt |
| 48 | + |
| 49 | + override val numPartitions = rowPartitions * colPartitions |
54 | 50 |
|
55 | 51 | /** |
56 | | - * Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise |
57 | | - * partitioning. |
| 52 | + * Returns the index of the partition the input coordinate belongs to. |
58 | 53 | * |
59 | | - * @param key The key for the SubMatrix. Can be its position in the grid (its column major index) |
60 | | - * or a tuple of three integers that are the final row index after the multiplication, |
61 | | - * the index of the block to multiply with, and the final column index after the |
| 54 | + * @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in |
62 | 55 | * multiplication. |
63 | | - * @return The index of the partition, which the SubMatrix belongs to. |
| 56 | + * @return The index of the partition, which the coordinate belongs to. |
64 | 57 | */ |
65 | 58 | override def getPartition(key: Any): Int = { |
66 | 59 | key match { |
67 | | - case (blockRowIndex: Int, blockColIndex: Int) => |
68 | | - getPartitionId(blockRowIndex, blockColIndex) |
69 | | - case (blockRowIndex: Int, innerIndex: Int, blockColIndex: Int) => |
70 | | - getPartitionId(blockRowIndex, blockColIndex) |
| 60 | + case (i: Int, j: Int) => |
| 61 | + getPartitionId(i, j) |
| 62 | + case (i: Int, j: Int, _) => |
| 63 | + getPartitionId(i, j) |
71 | 64 | case _ => |
72 | | - throw new IllegalArgumentException(s"Unrecognized key. key: $key") |
| 65 | + throw new IllegalArgumentException(s"Unrecognized key: $key") |
73 | 66 | } |
74 | 67 | } |
75 | 68 |
|
76 | 69 | /** Partitions sub-matrices as blocks with neighboring sub-matrices. */ |
77 | | - private def getPartitionId(blockRowIndex: Int, blockColIndex: Int): Int = { |
78 | | - require(0 <= blockRowIndex && blockRowIndex < numRowBlocks, "The blockRowIndex in the key " + |
79 | | - s"must be in the range 0 <= blockRowIndex < numRowBlocks. blockRowIndex: $blockRowIndex," + |
80 | | - s"numRowBlocks: $numRowBlocks") |
81 | | - require(0 <= blockRowIndex && blockColIndex < numColBlocks, "The blockColIndex in the key " + |
82 | | - s"must be in the range 0 <= blockRowIndex < numColBlocks. blockColIndex: $blockColIndex, " + |
83 | | - s"numColBlocks: $numColBlocks") |
84 | | - // Coordinates of the block |
85 | | - val i = blockRowIndex / numRowBlocksPerPartition |
86 | | - val j = blockColIndex / numColBlocksPerPartition |
87 | | - // The mod shouldn't be required but is added as a guarantee for possible corner cases |
88 | | - Utils.nonNegativeMod(j * blocksPerRow + i, numPartitions) |
89 | | - } |
90 | | - |
91 | | - /** Tries to calculate the optimal number of blocks that should be in each partition. */ |
92 | | - private def findOptimalBlockLengths: (Int, Int) = { |
93 | | - // Gives the optimal number of blocks that need to be in each partition |
94 | | - val targetNumBlocksPerPartition = math.ceil(totalBlocks * 1.0 / numPartitions).toInt |
95 | | - // Number of neighboring blocks to take in each row |
96 | | - var m = math.ceil(math.sqrt(targetNumBlocksPerPartition)).toInt |
97 | | - // Number of neighboring blocks to take in each column |
98 | | - var n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt |
99 | | - // Try to make m and n close to each other while making sure that we don't exceed the number |
100 | | - // of partitions |
101 | | - var numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m) |
102 | | - var numBlocksForCols = math.ceil(numColBlocks * 1.0 / n) |
103 | | - while ((numBlocksForRows * numBlocksForCols > numPartitions) && (m * n != 0)) { |
104 | | - if (numRowBlocks <= numColBlocks) { |
105 | | - m += 1 |
106 | | - n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt |
107 | | - } else { |
108 | | - n += 1 |
109 | | - m = math.ceil(targetNumBlocksPerPartition * 1.0 / n).toInt |
110 | | - } |
111 | | - numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m) |
112 | | - numBlocksForCols = math.ceil(numColBlocks * 1.0 / n) |
113 | | - } |
114 | | - // If a good partitioning scheme couldn't be found, set the side with the smaller dimension to |
115 | | - // 1 and the other to the number of targetNumBlocksPerPartition |
116 | | - if (m * n == 0) { |
117 | | - if (numRowBlocks <= numColBlocks) { |
118 | | - m = 1 |
119 | | - n = targetNumBlocksPerPartition |
120 | | - } else { |
121 | | - n = 1 |
122 | | - m = targetNumBlocksPerPartition |
123 | | - } |
124 | | - } |
125 | | - (m, n) |
| 70 | + private def getPartitionId(i: Int, j: Int): Int = { |
| 71 | + require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).") |
| 72 | + require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).") |
| 73 | + i / rowsPerPart + j / colsPerPart * rowPartitions |
126 | 74 | } |
127 | 75 |
|
128 | 76 | /** Checks whether the partitioners have the same characteristics */ |
129 | 77 | override def equals(obj: Any): Boolean = { |
130 | 78 | obj match { |
131 | 79 | case r: GridPartitioner => |
132 | | - (this.numRowBlocks == r.numRowBlocks) && (this.numColBlocks == r.numColBlocks) && |
133 | | - (this.numPartitions == r.numPartitions) |
| 80 | + (this.rows == r.rows) && (this.cols == r.cols) && |
| 81 | + (this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart) |
134 | 82 | case _ => |
135 | 83 | false |
136 | 84 | } |
137 | 85 | } |
138 | 86 | } |
139 | 87 |
|
| 88 | +private[mllib] object GridPartitioner { |
| 89 | + |
| 90 | + def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = { |
| 91 | + new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) |
| 92 | + } |
| 93 | + |
| 94 | + def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = { |
| 95 | + require(suggestedNumPartitions > 0) |
| 96 | + val scale = 1.0 / math.sqrt(suggestedNumPartitions) |
| 97 | + val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt |
| 98 | + val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt |
| 99 | + new GridPartitioner(rows, cols, rowsPerPart, colsPerPart) |
| 100 | + } |
| 101 | +} |
| 102 | + |
140 | 103 | /** |
141 | 104 | * Represents a distributed matrix in blocks of local matrices. |
142 | 105 | * |
@@ -191,7 +154,7 @@ class BlockMatrix( |
191 | 154 | val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt |
192 | 155 |
|
193 | 156 | private[mllib] var partitioner: GridPartitioner = |
194 | | - new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length) |
| 157 | + GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = rdd.partitions.size) |
195 | 158 |
|
196 | 159 | /** Returns the dimensions of the matrix. */ |
197 | 160 | private def getDim: (Long, Long) = { |
|
0 commit comments