Skip to content

Commit e81c869

Browse files
Yash Dattarxin
authored andcommitted
SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions
takeOrdered should skip reduce step in case mapped RDDs have no partitions. This prevents the mentioned exception : 4. run query SELECT * FROM testTable WHERE market = 'market2' ORDER BY End_Time DESC LIMIT 100; Error trace java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863) at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:863) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.reduce(RDD.scala:863) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1136) Author: Yash Datta <[email protected]> Closes #3830 from saucam/fix_takeorder and squashes the following commits: 5974d10 [Yash Datta] SPARK-4968: takeOrdered to skip reduce step in case mappers return no partitions (cherry picked from commit 9bc0df6) Signed-off-by: Reynold Xin <[email protected]>
1 parent 7604666 commit e81c869

File tree

1 file changed

+10
-5
lines changed
  • core/src/main/scala/org/apache/spark/rdd

1 file changed

+10
-5
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1132,15 +1132,20 @@ abstract class RDD[T: ClassTag](
11321132
if (num == 0) {
11331133
Array.empty
11341134
} else {
1135-
mapPartitions { items =>
1135+
val mapRDDs = mapPartitions { items =>
11361136
// Priority keeps the largest elements, so let's reverse the ordering.
11371137
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
11381138
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
11391139
Iterator.single(queue)
1140-
}.reduce { (queue1, queue2) =>
1141-
queue1 ++= queue2
1142-
queue1
1143-
}.toArray.sorted(ord)
1140+
}
1141+
if (mapRDDs.partitions.size == 0) {
1142+
Array.empty
1143+
} else {
1144+
mapRDDs.reduce { (queue1, queue2) =>
1145+
queue1 ++= queue2
1146+
queue1
1147+
}.toArray.sorted(ord)
1148+
}
11441149
}
11451150
}
11461151

0 commit comments

Comments
 (0)