Skip to content

Commit c5d8d82

Browse files
Eric Liangmateiz
authored andcommitted
[SPARK-3394] [SQL] Fix crash in TakeOrdered when limit is 0
This resolves https://issues.apache.org/jira/browse/SPARK-3394 Author: Eric Liang <[email protected]> Closes #2264 from ericl/spark-3394 and squashes the following commits: c87355b [Eric Liang] refactor bfb6140 [Eric Liang] change RDD takeOrdered instead 7a51528 [Eric Liang] fix takeordered when limit = 0 (cherry picked from commit 6754570) Signed-off-by: Matei Zaharia <[email protected]>
1 parent ce4053c commit c5d8d82

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,15 +1127,19 @@ abstract class RDD[T: ClassTag](
11271127
* @return an array of top elements
11281128
*/
11291129
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = {
1130-
mapPartitions { items =>
1131-
// Priority keeps the largest elements, so let's reverse the ordering.
1132-
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
1133-
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
1134-
Iterator.single(queue)
1135-
}.reduce { (queue1, queue2) =>
1136-
queue1 ++= queue2
1137-
queue1
1138-
}.toArray.sorted(ord)
1130+
if (num == 0) {
1131+
Array.empty
1132+
} else {
1133+
mapPartitions { items =>
1134+
// Priority keeps the largest elements, so let's reverse the ordering.
1135+
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
1136+
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
1137+
Iterator.single(queue)
1138+
}.reduce { (queue1, queue2) =>
1139+
queue1 ++= queue2
1140+
queue1
1141+
}.toArray.sorted(ord)
1142+
}
11391143
}
11401144

11411145
/**

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,13 @@ class RDDSuite extends FunSuite with SharedSparkContext {
521521
assert(sortedLowerK === Array(1, 2, 3, 4, 5))
522522
}
523523

524+
test("takeOrdered with limit 0") {
525+
val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
526+
val rdd = sc.makeRDD(nums, 2)
527+
val sortedLowerK = rdd.takeOrdered(0)
528+
assert(sortedLowerK.size === 0)
529+
}
530+
524531
test("takeOrdered with custom ordering") {
525532
val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
526533
implicit val ord = implicitly[Ordering[Int]].reverse

0 commit comments

Comments
 (0)