Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* Defines operations common to several Java RDD implementations.
* Note that this trait is not intended to be implemented by user code.
*/
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This

Expand Down Expand Up @@ -435,6 +439,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def first(): T = rdd.first()

/**
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = rdd.isEmpty()

/**
* Save this RDD as a text file, using string representations of elements.
*/
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,12 @@ abstract class RDD[T: ClassTag](
* */
def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)

/**
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0

/**
* Save this RDD as a text file, using string representations of elements.
*/
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,27 @@ public void take() {
rdd.takeSample(false, 2, 42);
}

@Test
public void isEmpty() {
Assert.assertTrue(sc.emptyRDD().isEmpty());
Assert.assertTrue(sc.parallelize(new ArrayList<Integer>()).isEmpty());
Assert.assertFalse(sc.parallelize(Arrays.asList(1)).isEmpty());
Assert.assertTrue(sc.parallelize(Arrays.asList(1, 2, 3), 3).filter(
new Function<Integer,Boolean>() {
@Override
public Boolean call(Integer i) {
return i < 0;
}
}).isEmpty());
Assert.assertFalse(sc.parallelize(Arrays.asList(1, 2, 3)).filter(
new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer i) {
return i > 1;
}
}).isEmpty());
}

@Test
public void cartesian() {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
Expand Down
9 changes: 9 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
assert(!nums.isEmpty())
assert(nums.max() === 4)
assert(nums.min() === 1)
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
Expand Down Expand Up @@ -545,6 +546,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(sortedTopK === nums.sorted(ord).take(5))
}

test("isEmpty") {
assert(sc.emptyRDD.isEmpty())
assert(sc.parallelize(Seq[Int]()).isEmpty())
assert(!sc.parallelize(Seq(1)).isEmpty())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this tests the case where there are multiple partitions but no data in any of the partitions. Maybe add something like

assert(sc.parallelize(Seq(1,2,3), 3).filter(_ < 0).isEmpty())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the sc.parallelize(Seq[Int]() case actually has multiple partitions but I'll add this too. Also, I'll check the case where the first partition is empty but others aren't.

assert(sc.parallelize(Seq(1,2,3), 3).filter(_ < 0).isEmpty())
assert(!sc.parallelize(Seq(1,2,3), 3).filter(_ > 1).isEmpty())
}

test("sample preserves partitioner") {
val partitioner = new HashPartitioner(2)
val rdd = sc.parallelize(Seq((0, 1), (2, 3))).partitionBy(partitioner)
Expand Down
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ object MimaExcludes {
// SPARK-5166 Spark SQL API stabilization
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Transformer.transform"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Estimator.fit")
) ++ Seq(
// SPARK-5270
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.isEmpty")
)

case v if v.startsWith("1.2") =>
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,18 @@ def first(self):
return rs[0]
raise ValueError("RDD is empty")

def isEmpty(self):
"""
Returns true if and only if the RDD contains no elements at all. Note that an RDD
may be empty even when it has at least 1 partition.

>>> sc.parallelize([]).isEmpty()
True
>>> sc.parallelize([1]).isEmpty()
False
"""
return self._jrdd.partitions().size() == 0 or len(self.take(1)) == 0

def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
"""
Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
Expand Down