@@ -267,6 +267,22 @@ public void call(String s) throws IOException {
267267 Assert .assertEquals (2 , accum .value ().intValue ());
268268 }
269269
270+ @ Test
271+ public void foreachPartition () {
272+ final Accumulator <Integer > accum = sc .accumulator (0 );
273+ JavaRDD <String > rdd = sc .parallelize (Arrays .asList ("Hello" , "World" ));
274+ rdd .foreachPartition (new VoidFunction <Iterator <String >>() {
275+ @ Override
276+ public void call (Iterator <String > iter ) throws IOException {
277+ while (iter .hasNext ()) {
278+ iter .next ();
279+ accum .add (1 );
280+ }
281+ }
282+ });
283+ Assert .assertEquals (2 , accum .value ().intValue ());
284+ }
285+
270286 @ Test
271287 public void toLocalIterator () {
272288 List <Integer > correct = Arrays .asList (1 , 2 , 3 , 4 );
@@ -657,6 +673,13 @@ public Boolean call(Integer i) {
657673 }).isEmpty ());
658674 }
659675
676+ @ Test
677+ public void toArray () {
678+ JavaRDD <Integer > rdd = sc .parallelize (Arrays .asList (1 , 2 , 3 ));
679+ List <Integer > list = rdd .toArray ();
680+ Assert .assertEquals (Arrays .asList (1 , 2 , 3 ), list );
681+ }
682+
660683 @ Test
661684 public void cartesian () {
662685 JavaDoubleRDD doubleRDD = sc .parallelizeDoubles (Arrays .asList (1.0 , 1.0 , 2.0 , 3.0 , 5.0 , 8.0 ));
@@ -714,6 +737,80 @@ public void javaDoubleRDDHistoGram() {
714737 sc .parallelizeDoubles (new ArrayList <Double >(0 ), 1 ).histogram (new double []{0.0 , 1.0 }));
715738 }
716739
740+ private static class DoubleComparator implements Comparator <Double >, Serializable {
741+ public int compare (Double o1 , Double o2 ) {
742+ return o1 .compareTo (o2 );
743+ }
744+ }
745+
746+ @ Test
747+ public void max () {
748+ JavaDoubleRDD rdd = sc .parallelizeDoubles (Arrays .asList (1.0 , 2.0 , 3.0 , 4.0 ));
749+ double max = rdd .max (new DoubleComparator ());
750+ Assert .assertEquals (4.0 , max , 0.001 );
751+ }
752+
753+ @ Test
754+ public void min () {
755+ JavaDoubleRDD rdd = sc .parallelizeDoubles (Arrays .asList (1.0 , 2.0 , 3.0 , 4.0 ));
756+ double max = rdd .min (new DoubleComparator ());
757+ Assert .assertEquals (1.0 , max , 0.001 );
758+ }
759+
760+ @ Test
761+ public void takeOrdered () {
762+ JavaDoubleRDD rdd = sc .parallelizeDoubles (Arrays .asList (1.0 , 2.0 , 3.0 , 4.0 ));
763+ Assert .assertEquals (Arrays .asList (1.0 , 2.0 ), rdd .takeOrdered (2 , new DoubleComparator ()));
764+ Assert .assertEquals (Arrays .asList (1.0 , 2.0 ), rdd .takeOrdered (2 ));
765+ }
766+
767+ @ Test
768+ public void top () {
769+ JavaRDD <Integer > rdd = sc .parallelize (Arrays .asList (1 , 2 , 3 , 4 ));
770+ List <Integer > top2 = rdd .top (2 );
771+ Assert .assertEquals (Arrays .asList (4 , 3 ), top2 );
772+ }
773+
774+ private static class AddInts implements Function2 <Integer , Integer , Integer > {
775+ @ Override
776+ public Integer call (Integer a , Integer b ) {
777+ return a + b ;
778+ }
779+ }
780+
781+ @ Test
782+ public void reduce () {
783+ JavaRDD <Integer > rdd = sc .parallelize (Arrays .asList (1 , 2 , 3 , 4 ));
784+ int sum = rdd .reduce (new AddInts ());
785+ Assert .assertEquals (10 , sum );
786+ }
787+
788+ @ Test
789+ public void reduceOnJavaDoubleRDD () {
790+ JavaDoubleRDD rdd = sc .parallelizeDoubles (Arrays .asList (1.0 , 2.0 , 3.0 , 4.0 ));
791+ double sum = rdd .reduce (new Function2 <Double , Double , Double >() {
792+ @ Override
793+ public Double call (Double v1 , Double v2 ) throws Exception {
794+ return v1 + v2 ;
795+ }
796+ });
797+ Assert .assertEquals (10.0 , sum , 0.001 );
798+ }
799+
800+ @ Test
801+ public void fold () {
802+ JavaRDD <Integer > rdd = sc .parallelize (Arrays .asList (1 , 2 , 3 , 4 ));
803+ int sum = rdd .fold (0 , new AddInts ());
804+ Assert .assertEquals (10 , sum );
805+ }
806+
807+ @ Test
808+ public void aggregate () {
809+ JavaRDD <Integer > rdd = sc .parallelize (Arrays .asList (1 , 2 , 3 , 4 ));
810+ int sum = rdd .aggregate (0 , new AddInts (), new AddInts ());
811+ Assert .assertEquals (10 , sum );
812+ }
813+
717814 @ Test
718815 public void map () {
719816 JavaRDD <Integer > rdd = sc .parallelize (Arrays .asList (1 , 2 , 3 , 4 , 5 ));
@@ -830,6 +927,25 @@ public Iterable<Integer> call(Iterator<Integer> iter) {
830927 Assert .assertEquals ("[3, 7]" , partitionSums .collect ().toString ());
831928 }
832929
930+
931+ @ Test
932+ public void mapPartitionsWithIndex () {
933+ JavaRDD <Integer > rdd = sc .parallelize (Arrays .asList (1 , 2 , 3 , 4 ), 2 );
934+ JavaRDD <Integer > partitionSums = rdd .mapPartitionsWithIndex (
935+ new Function2 <Integer , Iterator <Integer >, Iterator <Integer >>() {
936+ @ Override
937+ public Iterator <Integer > call (Integer index , Iterator <Integer > iter ) throws Exception {
938+ int sum = 0 ;
939+ while (iter .hasNext ()) {
940+ sum += iter .next ();
941+ }
942+ return Collections .singletonList (sum ).iterator ();
943+ }
944+ }, false );
945+ Assert .assertEquals ("[3, 7]" , partitionSums .collect ().toString ());
946+ }
947+
948+
833949 @ Test
834950 public void repartition () {
835951 // Shrinking number of partitions
@@ -1516,6 +1632,19 @@ public void collectAsync() throws Exception {
15161632 Assert .assertEquals (1 , future .jobIds ().size ());
15171633 }
15181634
1635+ @ Test
1636+ public void takeAsync () throws Exception {
1637+ List <Integer > data = Arrays .asList (1 , 2 , 3 , 4 , 5 );
1638+ JavaRDD <Integer > rdd = sc .parallelize (data , 1 );
1639+ JavaFutureAction <List <Integer >> future = rdd .takeAsync (1 );
1640+ List <Integer > result = future .get ();
1641+ Assert .assertEquals (1 , result .size ());
1642+ Assert .assertEquals ((Integer ) 1 , result .get (0 ));
1643+ Assert .assertFalse (future .isCancelled ());
1644+ Assert .assertTrue (future .isDone ());
1645+ Assert .assertEquals (1 , future .jobIds ().size ());
1646+ }
1647+
15191648 @ Test
15201649 public void foreachAsync () throws Exception {
15211650 List <Integer > data = Arrays .asList (1 , 2 , 3 , 4 , 5 );
0 commit comments