diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index bd451634e53d..62bf18d82d9b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -38,6 +38,10 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils +/** + * Defines operations common to several Java RDD implementations. + * Note that this trait is not intended to be implemented by user code. + */ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def wrapRDD(rdd: RDD[T]): This @@ -435,6 +439,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def first(): T = rdd.first() + /** + * @return true if and only if the RDD contains no elements at all. Note that an RDD + * may be empty even when it has at least 1 partition. + */ + def isEmpty(): Boolean = rdd.isEmpty() + /** * Save this RDD as a text file, using string representations of elements. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 5118e2b91112..97012c7033f9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1175,6 +1175,12 @@ abstract class RDD[T: ClassTag]( * */ def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min) + /** + * @return true if and only if the RDD contains no elements at all. Note that an RDD + * may be empty even when it has at least 1 partition. + */ + def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0 + /** * Save this RDD as a text file, using string representations of elements. */ diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 07b1e44d04be..004de05c10ee 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -606,6 +606,27 @@ public void take() { rdd.takeSample(false, 2, 42); } + @Test + public void isEmpty() { + Assert.assertTrue(sc.emptyRDD().isEmpty()); + Assert.assertTrue(sc.parallelize(new ArrayList()).isEmpty()); + Assert.assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty()); + Assert.assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter( + new Function() { + @Override + public Boolean call(Integer i) { + return i < 0; + } + }).isEmpty()); + Assert.assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter( + new Function() { + @Override + public Boolean call(Integer i) { + return i > 1; + } + }).isEmpty()); + } + @Test public void cartesian() { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 0deb9b18b868..381ee2d45630 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -52,6 +52,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4))) assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4")) assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4))) + assert(!nums.isEmpty()) assert(nums.max() === 4) assert(nums.min() === 1) val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _))) @@ -545,6 +546,14 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sortedTopK === nums.sorted(ord).take(5)) } + test("isEmpty") { + assert(sc.emptyRDD.isEmpty()) + assert(sc.parallelize(Seq[Int]()).isEmpty()) + assert(!sc.parallelize(Seq(1)).isEmpty()) + assert(sc.parallelize(Seq(1,2,3), 3).filter(_ < 0).isEmpty()) + assert(!sc.parallelize(Seq(1,2,3), 3).filter(_ > 1).isEmpty()) + } + test("sample preserves partitioner") { val partitioner = new HashPartitioner(2) val rdd = sc.parallelize(Seq((0, 1), (2, 3))).partitionBy(partitioner) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 0ccbfcb0c43f..95fef23ee4f3 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -82,6 +82,10 @@ object MimaExcludes { // SPARK-5166 Spark SQL API stabilization ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Transformer.transform"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Estimator.fit") + ) ++ Seq( + // SPARK-5270 + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.java.JavaRDDLike.isEmpty") ) case v if v.startsWith("1.2") => diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index c1120cf781e5..4977400ac1c0 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1130,6 +1130,18 @@ def first(self): return rs[0] raise ValueError("RDD is empty") + def isEmpty(self): + """ + Returns true if and only if the RDD contains no elements at all. Note that an RDD + may be empty even when it has at least 1 partition. + + >>> sc.parallelize([]).isEmpty() + True + >>> sc.parallelize([1]).isEmpty() + False + """ + return self._jrdd.partitions().size() == 0 or len(self.take(1)) == 0 + def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None): """ Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file