Skip to content

Commit 8b25838

Browse files
committed
Use Iterators in columnSimilarities to allow spill to disk
This could happen during mapPartitionsWithIndex in a dense and large column - this way Spark can spill the pairs onto disk instead of building all the pairs before handing them to Spark. (See SPARK-6713, apache/spark#5364, from which this code is lifted.)
1 parent cba9a68 commit 8b25838

File tree

1 file changed

+10
-11
lines changed

1 file changed

+10
-11
lines changed

src/main/scala/io/github/karlhigley/lexrank/linalg/RowMatrixOps.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ class RowMatrixOps(val rows: RDD[Vector]) {
5353
val rand = new Random(indx)
5454
val scaled = new Array[Double](p.size)
5555
iter.flatMap { row =>
56-
val buf = new ListBuffer[((Int, Int), Double)]()
5756
row match {
5857
case SparseVector(size, indices, values) =>
5958
val nnz = indices.size
@@ -62,8 +61,9 @@ class RowMatrixOps(val rows: RDD[Vector]) {
6261
scaled(k) = values(k) / q(indices(k))
6362
k += 1
6463
}
65-
k = 0
66-
while (k < nnz) {
64+
65+
Iterator.tabulate (nnz) { k =>
66+
val buf = new ListBuffer[((Int, Int), Double)]()
6767
val i = indices(k)
6868
val iVal = scaled(k)
6969
if (iVal != 0 && rand.nextDouble() < p(i)) {
@@ -77,8 +77,8 @@ class RowMatrixOps(val rows: RDD[Vector]) {
7777
l += 1
7878
}
7979
}
80-
k += 1
81-
}
80+
buf
81+
}.flatten
8282
/*
8383
case DenseVector(values) =>
8484
val n = values.size
@@ -87,8 +87,8 @@ class RowMatrixOps(val rows: RDD[Vector]) {
8787
scaled(i) = values(i) / q(i)
8888
i += 1
8989
}
90-
i = 0
91-
while (i < n) {
90+
Iterator.tabulate (n) { i =>
91+
val buf = new ListBuffer[((Int, Int), Double)]()
9292
val iVal = scaled(i)
9393
if (iVal != 0 && rand.nextDouble() < p(i)) {
9494
var j = i + 1
@@ -100,11 +100,10 @@ class RowMatrixOps(val rows: RDD[Vector]) {
100100
j += 1
101101
}
102102
}
103-
i += 1
104-
}
105-
*/
103+
buf
104+
}.flatten
105+
*/
106106
}
107-
buf
108107
}
109108
}.reduceByKey(_ + _).map { case ((i, j), sim) =>
110109
MatrixEntry(i.toLong, j.toLong, sim)

0 commit comments

Comments
 (0)