Skip to content

Commit 3971c93

Browse files
committed
[SPARK-4409] Third pass of code review
1 parent 75239f8 commit 3971c93

File tree

3 files changed

+189
-97
lines changed

3 files changed

+189
-97
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala

Lines changed: 141 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.linalg
1919

2020
import java.util.{Arrays, Random}
2121

22-
import scala.collection.mutable.{ArrayBuffer, Map}
22+
import scala.collection.mutable.{ArrayBuffer, ArrayBuilder, Map}
2323

2424
import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM}
2525

@@ -150,31 +150,35 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])
150150

151151
/** Generate a `SparseMatrix` from the given `DenseMatrix`. */
152152
def toSparse(): SparseMatrix = {
153-
val sparseA: ArrayBuffer[Double] = new ArrayBuffer()
154-
val sCols: ArrayBuffer[Int] = new ArrayBuffer(numCols + 1)
155-
val sRows: ArrayBuffer[Int] = new ArrayBuffer()
156-
var i = 0
153+
val spVals: ArrayBuilder[Double] = new ArrayBuilder.ofDouble
154+
val colPtrs: Array[Int] = new Array[Int](numCols + 1)
155+
val rowIndices: ArrayBuilder[Int] = new ArrayBuilder.ofInt
157156
var nnz = 0
158157
var lastCol = -1
159-
values.foreach { v =>
160-
val r = i % numRows
161-
val c = (i - r) / numRows
162-
if (v != 0.0) {
163-
sRows.append(r)
164-
sparseA.append(v)
165-
while (c != lastCol) {
166-
sCols.append(nnz)
167-
lastCol += 1
158+
var j = 0
159+
while (j < numCols) {
160+
var i = 0
161+
val indStart = j * numRows
162+
while (i < numRows) {
163+
val v = values(indStart + i)
164+
if (v != 0.0) {
165+
rowIndices += i
166+
spVals += v
167+
while (j != lastCol) {
168+
colPtrs(lastCol + 1) = nnz
169+
lastCol += 1
170+
}
171+
nnz += 1
168172
}
169-
nnz += 1
173+
i += 1
170174
}
171-
i += 1
175+
j += 1
172176
}
173177
while (numCols > lastCol) {
174-
sCols.append(sparseA.length)
178+
colPtrs(lastCol + 1) = nnz
175179
lastCol += 1
176180
}
177-
new SparseMatrix(numRows, numCols, sCols.toArray, sRows.toArray, sparseA.toArray)
181+
new SparseMatrix(numRows, numCols, colPtrs, rowIndices.result(), spVals.result())
178182
}
179183
}
180184

@@ -358,35 +362,30 @@ object SparseMatrix {
358362

359363
/**
360364
* Generate a `SparseMatrix` from Coordinate List (COO) format. Input must be an array of
361-
* (row, column, value) tuples. Array must be sorted first by *column* index and then by row
362-
* index.
365+
* (row, column, value) tuples.
363366
* @param numRows number of rows of the matrix
364367
* @param numCols number of columns of the matrix
365-
* @param entries Array of ((row, column), value) tuples
368+
* @param entries Array of (row, column, value) tuples
366369
* @return The corresponding `SparseMatrix`
367370
*/
368-
def fromCOO(numRows: Int, numCols: Int, entries: Array[((Int, Int), Double)]): SparseMatrix = {
369-
val colPtrs = new ArrayBuffer[Int](numCols + 1)
370-
colPtrs.append(0)
371+
def fromCOO(numRows: Int, numCols: Int, entries: Array[(Int, Int, Double)]): SparseMatrix = {
372+
val sortedEntries = entries.sortBy(v => (v._2, v._1))
373+
val colPtrs = new Array[Int](numCols + 1)
371374
var nnz = 0
372-
var lastCol = 0
373-
val values = entries.map { case ((i, j), v) =>
375+
var lastCol = -1
376+
val values = sortedEntries.map { case (i, j, v) =>
374377
while (j != lastCol) {
375-
colPtrs.append(nnz)
378+
colPtrs(lastCol + 1) = nnz
376379
lastCol += 1
377-
if (lastCol > numCols) {
378-
throw new IndexOutOfBoundsException("Please make sure that the entries array is " +
379-
"sorted by COLUMN index first and then by row index.")
380-
}
381380
}
382381
nnz += 1
383382
v
384383
}
385384
while (numCols > lastCol) {
386-
colPtrs.append(nnz)
385+
colPtrs(lastCol + 1) = nnz
387386
lastCol += 1
388387
}
389-
new SparseMatrix(numRows, numCols, colPtrs.toArray, entries.map(_._1._1), values)
388+
new SparseMatrix(numRows, numCols, colPtrs.toArray, sortedEntries.map(_._1), values)
390389
}
391390

392391
/**
@@ -411,17 +410,42 @@ object SparseMatrix {
411410
val length = math.ceil(numRows * numCols * density).toInt
412411
val entries = Map[(Int, Int), Double]()
413412
var i = 0
414-
while (i < length) {
415-
var rowIndex = rng.nextInt(numRows)
416-
var colIndex = rng.nextInt(numCols)
417-
while (entries.contains((rowIndex, colIndex))) {
418-
rowIndex = rng.nextInt(numRows)
419-
colIndex = rng.nextInt(numCols)
413+
// Expected number of iterations is less than 1.5 * length
414+
if (density < 0.34) {
415+
while (i < length) {
416+
var rowIndex = rng.nextInt(numRows)
417+
var colIndex = rng.nextInt(numCols)
418+
while (entries.contains((rowIndex, colIndex))) {
419+
rowIndex = rng.nextInt(numRows)
420+
colIndex = rng.nextInt(numCols)
421+
}
422+
entries += (rowIndex, colIndex) -> method(rng)
423+
i += 1
424+
}
425+
} else { // selection - rejection method
426+
var j = 0
427+
val triesPerCol = math.ceil(length * 1.0 / numCols).toInt
428+
val pool = numRows * numCols
429+
// loop over columns so that the sort in fromCOO requires less sorting
430+
while (i < length && j < numCols) {
431+
var k = 0
432+
val leftFromPool = (numCols - j) * numRows
433+
while (k < triesPerCol) {
434+
if (rng.nextDouble() < 1.0 * (length - i) / (pool - leftFromPool)) {
435+
var rowIndex = rng.nextInt(numRows)
436+
val colIndex = j
437+
while (entries.contains((rowIndex, colIndex))) {
438+
rowIndex = rng.nextInt(numRows)
439+
}
440+
entries += (rowIndex, colIndex) -> method(rng)
441+
i += 1
442+
}
443+
k += 1
444+
}
445+
j += 1
420446
}
421-
entries += (rowIndex, colIndex) -> method(rng)
422-
i += 1
423447
}
424-
SparseMatrix.fromCOO(numRows, numCols, entries.toArray.sortBy(v => (v._1._2, v._1._1)))
448+
SparseMatrix.fromCOO(numRows, numCols, entries.toArray.map(v => (v._1._1, v._1._2, v._2)))
425449
}
426450

427451
/**
@@ -462,12 +486,12 @@ object SparseMatrix {
462486
val n = vector.size
463487
vector match {
464488
case sVec: SparseVector =>
465-
val indices = sVec.indices.map(i => (i, i))
466-
SparseMatrix.fromCOO(n, n, indices.zip(sVec.values))
489+
val indices = sVec.indices
490+
SparseMatrix.fromCOO(n, n, indices.zip(sVec.values).map(v => (v._1, v._1, v._2)))
467491
case dVec: DenseVector =>
468492
val values = dVec.values.zipWithIndex
469493
val nnzVals = values.filter(v => v._1 != 0.0)
470-
SparseMatrix.fromCOO(n, n, nnzVals.map(v => ((v._2, v._2), v._1)))
494+
SparseMatrix.fromCOO(n, n, nnzVals.map(v => (v._2, v._2, v._1)))
471495
}
472496
}
473497
}
@@ -613,58 +637,70 @@ object Matrices {
613637
* @return a single `Matrix` composed of the matrices that were horizontally concatenated
614638
*/
615639
def horzcat(matrices: Array[Matrix]): Matrix = {
616-
if (matrices.size == 1) {
617-
return matrices(0)
618-
} else if (matrices.size == 0) {
640+
if (matrices.isEmpty) {
619641
return new DenseMatrix(0, 0, Array[Double]())
642+
} else if (matrices.size == 1) {
643+
return matrices(0)
620644
}
621645
val numRows = matrices(0).numRows
622646
var rowsMatch = true
623-
var hasDense = false
624647
var hasSparse = false
625648
var numCols = 0
626649
matrices.foreach { mat =>
627650
if (numRows != mat.numRows) rowsMatch = false
628651
mat match {
629652
case sparse: SparseMatrix => hasSparse = true
630-
case dense: DenseMatrix => hasDense = true
653+
case dense: DenseMatrix => // empty on purpose
631654
case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " +
632655
s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}")
633656
}
634657
numCols += mat.numCols
635658
}
636659
require(rowsMatch, "The number of rows of the matrices in this sequence, don't match!")
637660

638-
if (!hasSparse && hasDense) {
661+
if (!hasSparse) {
639662
new DenseMatrix(numRows, numCols, matrices.flatMap(_.toArray))
640663
} else {
641664
var startCol = 0
642-
val entries: Array[((Int, Int), Double)] = matrices.flatMap {
665+
val entries: Array[(Int, Int, Double)] = matrices.flatMap {
643666
case spMat: SparseMatrix =>
644667
var j = 0
645-
var cnt = 0
646-
val ptr = spMat.colPtrs
647-
val data = spMat.rowIndices.zip(spMat.values).map { case (i, v) =>
648-
cnt += 1
649-
if (cnt <= ptr(j + 1)) {
650-
((i, j + startCol), v)
651-
} else {
652-
while (ptr(j + 1) < cnt) {
653-
j += 1
654-
}
655-
((i, j + startCol), v)
668+
val colPtrs = spMat.colPtrs
669+
val rowIndices = spMat.rowIndices
670+
val values = spMat.values
671+
val data = new Array[(Int, Int, Double)](values.length)
672+
val nCols = spMat.numCols
673+
while (j < nCols) {
674+
var idx = colPtrs(j)
675+
while (idx < colPtrs(j + 1)) {
676+
val i = rowIndices(idx)
677+
val v = values(idx)
678+
data(idx) = (i, j + startCol, v)
679+
idx += 1
656680
}
681+
j += 1
657682
}
658-
startCol += spMat.numCols
683+
startCol += nCols
659684
data
660685
case dnMat: DenseMatrix =>
661-
val nnzValues = dnMat.values.zipWithIndex.filter(v => v._1 != 0.0)
662-
val data = nnzValues.map { case (v, i) =>
663-
val rowIndex = i % dnMat.numRows
664-
val colIndex = i / dnMat.numRows
665-
((rowIndex, colIndex + startCol), v)
686+
val data = new ArrayBuffer[(Int, Int, Double)]()
687+
var j = 0
688+
val nCols = dnMat.numCols
689+
val nRows = dnMat.numRows
690+
val values = dnMat.values
691+
while (j < nCols) {
692+
var i = 0
693+
val indStart = j * nRows
694+
while (i < nRows) {
695+
val v = values(indStart + i)
696+
if (v != 0.0) {
697+
data.append((i, j + startCol, v))
698+
}
699+
i += 1
700+
}
701+
j += 1
666702
}
667-
startCol += dnMat.numCols
703+
startCol += nCols
668704
data
669705
}
670706
SparseMatrix.fromCOO(numRows, numCols, entries)
@@ -679,14 +715,13 @@ object Matrices {
679715
* @return a single `Matrix` composed of the matrices that were vertically concatenated
680716
*/
681717
def vertcat(matrices: Array[Matrix]): Matrix = {
682-
if (matrices.size == 1) {
683-
return matrices(0)
684-
} else if (matrices.size == 0) {
718+
if (matrices.isEmpty) {
685719
return new DenseMatrix(0, 0, Array[Double]())
720+
} else if (matrices.size == 1) {
721+
return matrices(0)
686722
}
687723
val numCols = matrices(0).numCols
688724
var colsMatch = true
689-
var hasDense = false
690725
var hasSparse = false
691726
var numRows = 0
692727
var valsLength = 0
@@ -697,7 +732,6 @@ object Matrices {
697732
hasSparse = true
698733
valsLength += sparse.values.length
699734
case dense: DenseMatrix =>
700-
hasDense = true
701735
valsLength += dense.values.length
702736
case _ => throw new IllegalArgumentException("Unsupported matrix format. Expected " +
703737
s"SparseMatrix or DenseMatrix. Instead got: ${mat.getClass}")
@@ -707,7 +741,7 @@ object Matrices {
707741
}
708742
require(colsMatch, "The number of rows of the matrices in this sequence, don't match!")
709743

710-
if (!hasSparse && hasDense) {
744+
if (!hasSparse) {
711745
val matData = matrices.zipWithIndex.flatMap { case (mat, ind) =>
712746
val values = mat.toArray
713747
for (j <- 0 until numCols) yield (j, ind,
@@ -716,34 +750,46 @@ object Matrices {
716750
new DenseMatrix(numRows, numCols, matData.flatMap(_._3))
717751
} else {
718752
var startRow = 0
719-
val entries: Array[((Int, Int), Double)] = matrices.flatMap {
753+
val entries: Array[(Int, Int, Double)] = matrices.flatMap {
720754
case spMat: SparseMatrix =>
721755
var j = 0
722-
var cnt = 0
723-
val ptr = spMat.colPtrs
724-
val data = spMat.rowIndices.zip(spMat.values).map { case (i, v) =>
725-
cnt += 1
726-
if (cnt <= ptr(j + 1)) {
727-
((i + startRow, j), v)
728-
} else {
729-
while (ptr(j + 1) < cnt) {
730-
j += 1
731-
}
732-
((i + startRow, j), v)
756+
val colPtrs = spMat.colPtrs
757+
val rowIndices = spMat.rowIndices
758+
val values = spMat.values
759+
val data = new Array[(Int, Int, Double)](values.length)
760+
while (j < numCols) {
761+
var idx = colPtrs(j)
762+
while (idx < colPtrs(j + 1)) {
763+
val i = rowIndices(idx)
764+
val v = values(idx)
765+
data(idx) = (i + startRow, j, v)
766+
idx += 1
733767
}
768+
j += 1
734769
}
735770
startRow += spMat.numRows
736771
data
737772
case dnMat: DenseMatrix =>
738-
val nnzValues = dnMat.values.zipWithIndex.filter(v => v._1 != 0.0)
739-
val data = nnzValues.map { case (v, i) =>
740-
val rowIndex = i % dnMat.numRows
741-
val colIndex = i / dnMat.numRows
742-
((rowIndex + startRow, colIndex), v)
773+
val data = new ArrayBuffer[(Int, Int, Double)]()
774+
var j = 0
775+
val nCols = dnMat.numCols
776+
val nRows = dnMat.numRows
777+
val values = dnMat.values
778+
while (j < nCols) {
779+
var i = 0
780+
val indStart = j * nRows
781+
while (i < nRows) {
782+
val v = values(indStart + i)
783+
if (v != 0.0) {
784+
data.append((i + startRow, j, v))
785+
}
786+
i += 1
787+
}
788+
j += 1
743789
}
744-
startRow += dnMat.numRows
790+
startRow += nRows
745791
data
746-
}.sortBy(d => (d._1._2, d._1._1))
792+
}
747793
SparseMatrix.fromCOO(numRows, numCols, entries)
748794
}
749795
}

mllib/src/test/java/org/apache/spark/mllib/linalg/JavaMatricesSuite.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,25 @@ public void zerosMatrixConstruction() {
9999
assertArrayEquals(done.toArray(), new double[]{1.0, 1.0, 1.0, 1.0}, 0.0);
100100
}
101101

102+
@Test
103+
public void sparseDenseConversion() {
104+
int m = 3;
105+
int n = 2;
106+
double[] values = new double[]{1.0, 2.0, 4.0, 5.0};
107+
double[] allValues = new double[]{1.0, 2.0, 0.0, 0.0, 4.0, 5.0};
108+
int[] colPtrs = new int[]{0, 2, 4};
109+
int[] rowIndices = new int[]{0, 1, 1, 2};
110+
111+
SparseMatrix spMat1 = new SparseMatrix(m, n, colPtrs, rowIndices, values);
112+
DenseMatrix deMat1 = new DenseMatrix(m, n, allValues);
113+
114+
SparseMatrix spMat2 = deMat1.toSparse();
115+
DenseMatrix deMat2 = spMat1.toDense();
116+
117+
assertArrayEquals(spMat1.toArray(), spMat2.toArray(), 0.0);
118+
assertArrayEquals(deMat1.toArray(), deMat2.toArray(), 0.0);
119+
}
120+
102121
@Test
103122
public void concatenateMatrices() {
104123
int m = 3;

0 commit comments

Comments
 (0)