Skip to content

Commit ef6b729

Browse files
committed
Change sort flag into Option
1 parent 3f6eeed commit ef6b729

File tree

3 files changed

+7
-7
lines changed

3 files changed

+7
-7
lines changed

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class ShuffleDependency[K, V, C](
6363
val keyOrdering: Option[Ordering[K]] = None,
6464
val aggregator: Option[Aggregator[K, V, C]] = None,
6565
val mapSideCombine: Boolean = false,
66-
val ascending: Boolean = true)
66+
val ascending: Option[Boolean] = None)
6767
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
6868

6969
val shuffleId: Int = rdd.context.newShuffleId()

core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
5151

5252
private var mapSideCombine: Boolean = false
5353

54-
private var ascending: Boolean = true
54+
private var ascending: Option[Boolean] = None
5555

5656
/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
5757
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = {
@@ -79,7 +79,7 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
7979

8080
/** Set sort flag for RDD's sorting. */
8181
def setSortFlag(ascending: Boolean): ShuffledRDD[K, V, C, P] = {
82-
this.ascending = ascending
82+
this.ascending = Option(ascending)
8383
this
8484
}
8585

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,16 @@ class HashShuffleReader[K, C](
5050
iter
5151
}
5252

53-
dep.keyOrdering.map { ordering =>
53+
val sortedIter = for (asc <- dep.ascending; ordering <- dep.keyOrdering) yield {
5454
val buf = aggregatedIter.toArray
55-
if (dep.ascending) {
55+
if (asc) {
5656
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
5757
} else {
5858
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
5959
}
60-
}.getOrElse {
61-
aggregatedIter
6260
}
61+
62+
sortedIter.getOrElse(aggregatedIter)
6363
}
6464

6565
/** Close this reader */

0 commit comments

Comments
 (0)