Skip to content

Commit e258e50

Browse files
epahomovpwendell
authored andcommitted
[SPARK-1259] Make RDD locally iterable
Author: Egor Pakhomov <[email protected]> Closes #156 from epahomov/SPARK-1259 and squashes the following commits: 8ec8f24 [Egor Pakhomov] Make to local iterator shorter 34aa300 [Egor Pakhomov] Fix toLocalIterator docs 08363ef [Egor Pakhomov] SPARK-1259 from toLocallyIterable to toLocalIterator 6a994eb [Egor Pakhomov] SPARK-1259 Make RDD locally iterable 8be3dcf [Egor Pakhomov] SPARK-1259 Make RDD locally iterable 33ecb17 [Egor Pakhomov] SPARK-1259 Make RDD locally iterable
1 parent 7012ffa commit e258e50

File tree

4 files changed

+35
-1
lines changed

4 files changed

+35
-1
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.api.java
1919

20-
import java.util.{Comparator, List => JList}
20+
import java.util.{Comparator, Iterator => JIterator, List => JList}
21+
import java.lang.{Iterable => JIterable}
2122

2223
import scala.collection.JavaConversions._
2324
import scala.reflect.ClassTag
@@ -280,6 +281,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
280281
new java.util.ArrayList(arr)
281282
}
282283

284+
/**
285+
* Return an iterator that contains all of the elements in this RDD.
286+
*
287+
* The iterator will consume as much memory as the largest partition in this RDD.
288+
*/
289+
def toLocalIterator(): JIterator[T] = {
290+
import scala.collection.JavaConversions._
291+
rdd.toLocalIterator
292+
}
293+
294+
283295
/**
284296
* Return an array that contains all of the elements in this RDD.
285297
* @deprecated As of Spark 1.0.0, toArray() is deprecated, use {@link #collect()} instead

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,18 @@ abstract class RDD[T: ClassTag](
661661
Array.concat(results: _*)
662662
}
663663

664+
/**
665+
* Return an iterator that contains all of the elements in this RDD.
666+
*
667+
* The iterator will consume as much memory as the largest partition in this RDD.
668+
*/
669+
def toLocalIterator: Iterator[T] = {
670+
def collectPartition(p: Int): Array[T] = {
671+
sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
672+
}
673+
(0 until partitions.length).iterator.flatMap(i => collectPartition(i))
674+
}
675+
664676
/**
665677
* Return an array that contains all of the elements in this RDD.
666678
*/

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import scala.Tuple2;
2424

25+
import com.google.common.collect.Lists;
2526
import com.google.common.base.Optional;
2627
import com.google.common.base.Charsets;
2728
import com.google.common.io.Files;
@@ -179,6 +180,14 @@ public void call(String s) {
179180
Assert.assertEquals(2, foreachCalls);
180181
}
181182

183+
@Test
184+
public void toLocalIterator() {
185+
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
186+
JavaRDD<Integer> rdd = sc.parallelize(correct);
187+
List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
188+
Assert.assertTrue(correct.equals(result));
189+
}
190+
182191
@SuppressWarnings("unchecked")
183192
@Test
184193
public void lookup() {

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
3333
test("basic operations") {
3434
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
3535
assert(nums.collect().toList === List(1, 2, 3, 4))
36+
assert(nums.toLocalIterator.toList === List(1, 2, 3, 4))
3637
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
3738
assert(dups.distinct().count() === 4)
3839
assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?

0 commit comments

Comments
 (0)