Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ class IndexedRowMatrix @Since("1.0.0") (
}

/**
* Converts to BlockMatrix. Creates blocks of `SparseMatrix` with size 1024 x 1024.
* Converts to BlockMatrix. Creates blocks with size 1024 x 1024.
*/
@Since("1.3.0")
def toBlockMatrix(): BlockMatrix = {
toBlockMatrix(1024, 1024)
}

/**
* Converts to BlockMatrix. Creates blocks of `SparseMatrix`.
* Converts to BlockMatrix. Blocks may be sparse or dense depending on the sparsity of the rows.
* @param rowsPerBlock The number of rows of each block. The blocks at the bottom edge may have
* a smaller value. Must be an integer value greater than 0.
* @param colsPerBlock The number of columns of each block. The blocks at the right edge may have
Expand All @@ -108,8 +108,64 @@ class IndexedRowMatrix @Since("1.0.0") (
*/
@Since("1.3.0")
def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
// TODO: This implementation may be optimized
toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock)
require(rowsPerBlock > 0,
s"rowsPerBlock needs to be greater than 0. rowsPerBlock: $rowsPerBlock")
require(colsPerBlock > 0,
s"colsPerBlock needs to be greater than 0. colsPerBlock: $colsPerBlock")

val m = numRows()
val n = numCols()
val lastRowBlockIndex = m / rowsPerBlock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is the index of the final, smaller block, if any? I get it, but if m = 100 and n = 10 then this is 10, which is not the index of the last row block. There is no leftover smaller block and the last one is 9. I think the code works and I'm splitting hairs but wonder if this is clearer if it's the "remainder" block index or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Replaced word "last" with "remainder" and added a small clarifying comment.

val lastColBlockIndex = n / colsPerBlock
val lastRowBlockSize = (m % rowsPerBlock).toInt
val lastColBlockSize = (n % colsPerBlock).toInt
val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt
val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt

val blocks = rows.flatMap { ir: IndexedRow =>
val blockRow = ir.index / rowsPerBlock
val rowInBlock = ir.index % rowsPerBlock

ir.vector match {
case SparseVector(size, indices, values) =>
indices.zip(values).map { case (index, value) =>
val blockColumn = index / colsPerBlock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's an assumption here that the block index can't be larger than an Int, but it could, right? conceptually the index in an IndexedRow could be huge. Does blockRow need to stay a Long or am I overlooking why it won't happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it is true that IndexedRowMatrix could have a Long number of rows, but BlockMatrix is backed by an RDD of ((Int, Int), Matrix), so we're limited by that. I can just add a check that computes whether it's possible to make a BlockMatrix from the given IndexedRowMatrix.

val columnInBlock = index % colsPerBlock
((blockRow.toInt, blockColumn.toInt), (rowInBlock.toInt, Array((value, columnInBlock))))
}
case DenseVector(values) =>
values.grouped(colsPerBlock)
.zipWithIndex
.map { case (values, blockColumn) =>
((blockRow.toInt, blockColumn), (rowInBlock.toInt, values.zipWithIndex))
}
}
}.groupByKey(GridPartitioner(numRowBlocks, numColBlocks, rows.getNumPartitions)).map {
case ((blockRow, blockColumn), itr) =>
val actualNumRows =
if (blockRow == lastRowBlockIndex) lastRowBlockSize else rowsPerBlock
val actualNumColumns =
if (blockColumn == lastColBlockIndex) lastColBlockSize else colsPerBlock

val arraySize = actualNumRows * actualNumColumns
val matrixAsArray = new Array[Double](arraySize)
var countForValues = 0
itr.foreach { case (rowWithinBlock, valuesWithColumns) =>
valuesWithColumns.foreach { case (value, columnWithinBlock) =>
matrixAsArray.update(columnWithinBlock * actualNumRows + rowWithinBlock, value)
countForValues += 1
}
}
val denseMatrix = new DenseMatrix(actualNumRows, actualNumColumns, matrixAsArray)
val finalMatrix = if (countForValues / arraySize.toDouble >= 0.5) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if 0.5 is a proper value practically. Maybe others have more ideas about it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockMatrix seems to use sparse representations when <= 10% of values are non-zero when converting to an indexed row matrix. Maybe go with that?

denseMatrix
} else {
denseMatrix.toSparse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this isn't inefficient because making the dense matrix doesn't copy or anything. Seems OK

}

((blockRow, blockColumn), finalMatrix)
}
new BlockMatrix(blocks, rowsPerBlock, colsPerBlock, this.numRows(), this.numCols())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can the last two args simply be m, n for clarity?

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.mllib.linalg.distributed
import breeze.linalg.{diag => brzDiag, DenseMatrix => BDM, DenseVector => BDV}

import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.linalg.{Matrices, Vectors}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.rdd.RDD

Expand Down Expand Up @@ -87,21 +87,74 @@ class IndexedRowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(coordMat.toBreeze() === idxRowMat.toBreeze())
}

test("toBlockMatrix") {
val idxRowMat = new IndexedRowMatrix(indexedRows)
val blockMat = idxRowMat.toBlockMatrix(2, 2)
test("toBlockMatrix dense backing") {
val idxRowMatDense = new IndexedRowMatrix(indexedRows)

// Tests when n % colsPerBlock != 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only tests a IndexedRowMatrix consisted of DenseVector. We should add another test to test the cases of SparseVector and the mix of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll write more tests today.

val blockMat = idxRowMatDense.toBlockMatrix(2, 2)
assert(blockMat.numRows() === m)
assert(blockMat.numCols() === n)
assert(blockMat.toBreeze() === idxRowMat.toBreeze())
assert(blockMat.toBreeze() === idxRowMatDense.toBreeze())

// Tests when m % rowsPerBlock != 0
val blockMat2 = idxRowMatDense.toBlockMatrix(3, 1)
assert(blockMat2.numRows() === m)
assert(blockMat2.numCols() === n)
assert(blockMat2.toBreeze() === idxRowMatDense.toBreeze())

intercept[IllegalArgumentException] {
idxRowMat.toBlockMatrix(-1, 2)
idxRowMatDense.toBlockMatrix(-1, 2)
}
intercept[IllegalArgumentException] {
idxRowMat.toBlockMatrix(2, 0)
idxRowMatDense.toBlockMatrix(2, 0)
}
assert(blockMat.blocks.map { case (_, matrix: Matrix) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the style looks weird.

Maybe:

assert(blockMat.blocks.map { case (_, matrix: Matrix) =>
  matrix.isInstanceOf[DenseMatrix]
}.reduce(_ && _))

matrix.isInstanceOf[DenseMatrix]}.reduce(_ && _))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check if blockMat2 is all sparse matrix backing?

}

test("toBlockMatrix sparse backing") {
val sparseData = Seq(
(3L, Vectors.sparse(3, Seq((0, 4.0))))
).map(x => IndexedRow(x._1, x._2))

val idxRowMatSparse = new IndexedRowMatrix(sc.parallelize(sparseData))

// Tests when n % colsPerBlock != 0
val blockMat = idxRowMatSparse.toBlockMatrix(2, 2)
assert(blockMat.numRows() === m)
assert(blockMat.numCols() === n)
assert(blockMat.toBreeze() === idxRowMatSparse.toBreeze())

// Tests when m % rowsPerBlock != 0
val blockMat2 = idxRowMatSparse.toBlockMatrix(3, 1)
assert(blockMat2.numRows() === m)
assert(blockMat2.numCols() === n)
assert(blockMat2.toBreeze() === idxRowMatSparse.toBreeze())

assert(blockMat.blocks.map { case (_, matrix: Matrix) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this just blockMat.blocks.forall { case (_, matrix) => matrix.isInstanceOf[SparseMatrix] }?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure there is no forall on RDD's, which is why I wrote it this way. Could do it as collect().forall I suppose.

matrix.isInstanceOf[SparseMatrix]}.reduce(_ && _))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check if blockMat2 is all sparse matrix backing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done, but comment is not showing as outdated.

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: styling like above suggested.


test("toBlockMatrix mixed backing") {
val mixedData = Seq(
(0L, Vectors.dense(1, 2, 3)),
(3L, Vectors.sparse(3, Seq((0, 4.0)))))

val idxRowMatMixed = new IndexedRowMatrix(
sc.parallelize(mixedData.map(x => IndexedRow(x._1, x._2))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with above, can we move the call .map(x => IndexedRow(x._1, x._2)) to mixedData?


val blockMat = idxRowMatMixed.toBlockMatrix(2, 2)
assert(blockMat.numRows() === m)
assert(blockMat.numCols() === n)
assert(blockMat.toBreeze() === idxRowMatMixed.toBreeze())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add test for Tests when m % rowsPerBlock != 0?


val blocks = blockMat.blocks.collect()

blocks.forall { case((row, col), matrix) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment with a simple diagram to show the content of the matrix? So it can be easy to judge which part is sparse/dense.

if (row == 0) matrix.isInstanceOf[DenseMatrix] else matrix.isInstanceOf[SparseMatrix]}
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove extra space line.

test("multiply a local matrix") {
val A = new IndexedRowMatrix(indexedRows)
val B = Matrices.dense(3, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
Expand Down