Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ class IndexedRowMatrix @Since("1.0.0") (
}

/**
* Converts to BlockMatrix. Creates blocks of `SparseMatrix` with size 1024 x 1024.
* Converts to BlockMatrix. Creates blocks with size 1024 x 1024.
*/
@Since("1.3.0")
def toBlockMatrix(): BlockMatrix = {
toBlockMatrix(1024, 1024)
}

/**
* Converts to BlockMatrix. Creates blocks of `SparseMatrix`.
* Converts to BlockMatrix. Blocks may be sparse or dense depending on the sparsity of the rows.
* @param rowsPerBlock The number of rows of each block. The blocks at the bottom edge may have
* a smaller value. Must be an integer value greater than 0.
* @param colsPerBlock The number of columns of each block. The blocks at the right edge may have
Expand All @@ -108,8 +108,69 @@ class IndexedRowMatrix @Since("1.0.0") (
*/
@Since("1.3.0")
def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
// TODO: This implementation may be optimized
toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock)
require(rowsPerBlock > 0,
s"rowsPerBlock needs to be greater than 0. rowsPerBlock: $rowsPerBlock")
require(colsPerBlock > 0,
s"colsPerBlock needs to be greater than 0. colsPerBlock: $colsPerBlock")

// Since block matrices require an integer row index
require(numRows() / rowsPerBlock.toDouble <= Int.MaxValue,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess the previous toBlockMatrix would have failed too when the number of rows exceeded this threshold? it looks like it, given how CoordinateMatrix.toBlockMatrix works. Hm, I wonder if you should even put this warning over there too because it will fail mysteriously otherwise. The result might even be wrong.

BTW on second look, I realize this check isn't quite the same as the math that's performed below: math.ceil(m.toDouble / rowsPerBlock).toInt. I think you want to check exactly the same thing. Maybe move the check below the declaration of m, n, and just say: require(math.ceil(m.toDouble / rowsPerBlock) <= Int.MaxValue) That's very clear.

Also, cols need to be checked.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 We should fix CoordinateMatrix.toBlockMatrix too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cols, we may not need to do this check. Because each IndexedRow can only have the number of columns less than Int.MaxValue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, even with block size one IndexedRows are limited by length of array which is itself limited by max int, so should be fine.

"Number of rows divided by rowsPerBlock cannot exceed maximum integer.")

val m = numRows()
val n = numCols()
// The remainder calculations only matter when m % rowsPerBlock != 0 or n % colsPerBlock != 0
val remainderRowBlockIndex = m / rowsPerBlock
val remainderColBlockIndex = n / colsPerBlock
val remainderRowBlockSize = (m % rowsPerBlock).toInt
val remainderColBlockSize = (n % colsPerBlock).toInt
val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt
val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt

val blocks = rows.flatMap { ir: IndexedRow =>
val blockRow = ir.index / rowsPerBlock
val rowInBlock = ir.index % rowsPerBlock

ir.vector match {
case SparseVector(size, indices, values) =>
indices.zip(values).map { case (index, value) =>
val blockColumn = index / colsPerBlock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's an assumption here that the block index can't be larger than an Int, but it could, right? conceptually the index in an IndexedRow could be huge. Does blockRow need to stay a Long or am I overlooking why it won't happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it is true that IndexedRowMatrix could have a Long number of rows, but BlockMatrix is backed by an RDD of ((Int, Int), Matrix), so we're limited by that. I can just add a check that computes whether it's possible to make a BlockMatrix from the given IndexedRowMatrix.

val columnInBlock = index % colsPerBlock
((blockRow.toInt, blockColumn.toInt), (rowInBlock.toInt, Array((value, columnInBlock))))
}
case DenseVector(values) =>
values.grouped(colsPerBlock)
.zipWithIndex
.map { case (values, blockColumn) =>
((blockRow.toInt, blockColumn), (rowInBlock.toInt, values.zipWithIndex))
}
}
}.groupByKey(GridPartitioner(numRowBlocks, numColBlocks, rows.getNumPartitions)).map {
case ((blockRow, blockColumn), itr) =>
val actualNumRows =
if (blockRow == remainderRowBlockIndex) remainderRowBlockSize else rowsPerBlock
val actualNumColumns =
if (blockColumn == remainderColBlockIndex) remainderColBlockSize else colsPerBlock

val arraySize = actualNumRows * actualNumColumns
val matrixAsArray = new Array[Double](arraySize)
var countForValues = 0
itr.foreach { case (rowWithinBlock, valuesWithColumns) =>
valuesWithColumns.foreach { case (value, columnWithinBlock) =>
matrixAsArray.update(columnWithinBlock * actualNumRows + rowWithinBlock, value)
countForValues += 1
}
}
val denseMatrix = new DenseMatrix(actualNumRows, actualNumColumns, matrixAsArray)
val finalMatrix = if (countForValues / arraySize.toDouble >= 0.1) {
denseMatrix
} else {
denseMatrix.toSparse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this isn't inefficient because making the dense matrix doesn't copy or anything. Seems OK

}

((blockRow, blockColumn), finalMatrix)
}
new BlockMatrix(blocks, rowsPerBlock, colsPerBlock, this.numRows(), this.numCols())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can the last two args simply be m, n for clarity?

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.mllib.linalg.distributed
import breeze.linalg.{diag => brzDiag, DenseMatrix => BDM, DenseVector => BDV}

import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.linalg.{Matrices, Vectors}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.rdd.RDD

Expand Down Expand Up @@ -87,19 +87,96 @@ class IndexedRowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(coordMat.toBreeze() === idxRowMat.toBreeze())
}

test("toBlockMatrix") {
val idxRowMat = new IndexedRowMatrix(indexedRows)
val blockMat = idxRowMat.toBlockMatrix(2, 2)
test("toBlockMatrix dense backing") {
val idxRowMatDense = new IndexedRowMatrix(indexedRows)

// Tests when n % colsPerBlock != 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only tests a IndexedRowMatrix consisted of DenseVector. We should add another test to test the cases of SparseVector and the mix of them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll write more tests today.

val blockMat = idxRowMatDense.toBlockMatrix(2, 2)
assert(blockMat.numRows() === m)
assert(blockMat.numCols() === n)
assert(blockMat.toBreeze() === idxRowMat.toBreeze())
assert(blockMat.toBreeze() === idxRowMatDense.toBreeze())

// Tests when m % rowsPerBlock != 0
val blockMat2 = idxRowMatDense.toBlockMatrix(3, 1)
assert(blockMat2.numRows() === m)
assert(blockMat2.numCols() === n)
assert(blockMat2.toBreeze() === idxRowMatDense.toBreeze())

intercept[IllegalArgumentException] {
idxRowMat.toBlockMatrix(-1, 2)
idxRowMatDense.toBlockMatrix(-1, 2)
}
intercept[IllegalArgumentException] {
idxRowMat.toBlockMatrix(2, 0)
idxRowMatDense.toBlockMatrix(2, 0)
}

assert(blockMat.blocks.map { case (_, matrix: Matrix) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the style looks weird.

Maybe:

assert(blockMat.blocks.map { case (_, matrix: Matrix) =>
  matrix.isInstanceOf[DenseMatrix]
}.reduce(_ && _))

matrix.isInstanceOf[DenseMatrix]
}.reduce(_ && _))
assert(blockMat2.blocks.map { case (_, matrix: Matrix) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: styling like suggested above.

matrix.isInstanceOf[DenseMatrix]
}.reduce(_ && _))
}

test("toBlockMatrix sparse backing") {
val sparseData = Seq(
(15L, Vectors.sparse(12, Seq((0, 4.0))))
).map(x => IndexedRow(x._1, x._2))

// Gonna make m and n larger here so the matrices can easily be completely sparse:
val m = 16
val n = 12

val idxRowMatSparse = new IndexedRowMatrix(sc.parallelize(sparseData))

// Tests when n % colsPerBlock != 0
val blockMat = idxRowMatSparse.toBlockMatrix(8, 8)
assert(blockMat.numRows() === m)
assert(blockMat.numCols() === n)
assert(blockMat.toBreeze() === idxRowMatSparse.toBreeze())

// Tests when m % rowsPerBlock != 0
val blockMat2 = idxRowMatSparse.toBlockMatrix(6, 6)
assert(blockMat2.numRows() === m)
assert(blockMat2.numCols() === n)
assert(blockMat2.toBreeze() === idxRowMatSparse.toBreeze())

assert(blockMat.blocks.collect().forall{ case (_, matrix: Matrix) =>
matrix.isInstanceOf[SparseMatrix]
})
assert(blockMat2.blocks.collect().forall{ case (_, matrix: Matrix) =>
matrix.isInstanceOf[SparseMatrix]
})
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: styling like above suggested.


test("toBlockMatrix mixed backing") {
val m = 24
val n = 18

val mixedData = Seq(
(0L, Vectors.dense((0 to 17).map(_.toDouble).toArray)),
(1L, Vectors.dense((0 to 17).map(_.toDouble).toArray)),
(23L, Vectors.sparse(18, Seq((0, 4.0)))))
.map(x => IndexedRow(x._1, x._2))

val idxRowMatMixed = new IndexedRowMatrix(
sc.parallelize(mixedData))

// Tests when n % colsPerBlock != 0
val blockMat = idxRowMatMixed.toBlockMatrix(12, 12)
assert(blockMat.numRows() === m)
assert(blockMat.numCols() === n)
assert(blockMat.toBreeze() === idxRowMatMixed.toBreeze())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add test for Tests when m % rowsPerBlock != 0?


// Tests when m % rowsPerBlock != 0
val blockMat2 = idxRowMatMixed.toBlockMatrix(18, 6)
assert(blockMat2.numRows() === m)
assert(blockMat2.numCols() === n)
assert(blockMat2.toBreeze() === idxRowMatMixed.toBreeze())

val blocks = blockMat.blocks.collect()

assert(blocks.forall { case((row, col), matrix) =>
if (row == 0) matrix.isInstanceOf[DenseMatrix] else matrix.isInstanceOf[SparseMatrix]})
}

test("multiply a local matrix") {
Expand Down