Skip to content

Commit 47b6b38

Browse files
jerryshaomateiz
authored andcommitted
[SPARK-2125] Add sort flag and move sort into shuffle implementations
This patch adds a sort flag into ShuffleDependecy and moves sort into hash shuffle implementation. Moving sort into shuffle implementation can give space for other shuffle implementations (like sort-based shuffle) to better optimize sort through shuffle. Author: jerryshao <[email protected]> Closes #1210 from jerryshao/SPARK-2125 and squashes the following commits: 2feaf7b [jerryshao] revert MimaExcludes ceddf75 [jerryshao] add MimaExeclude f674ff4 [jerryshao] Add missing Scope restriction b9fe0dd [jerryshao] Fix some style issues according to comments ef6b729 [jerryshao] Change sort flag into Option 3f6eeed [jerryshao] Fix issues related to unit test 2f552a5 [jerryshao] Minor changes about naming and order c92a281 [jerryshao] Move sort into shuffle implementations
1 parent ab3c6a4 commit 47b6b38

File tree

4 files changed

+35
-12
lines changed

4 files changed

+35
-12
lines changed

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark
1919

2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.rdd.SortOrder.SortOrder
2223
import org.apache.spark.serializer.Serializer
2324
import org.apache.spark.shuffle.ShuffleHandle
2425

@@ -62,7 +63,8 @@ class ShuffleDependency[K, V, C](
6263
val serializer: Option[Serializer] = None,
6364
val keyOrdering: Option[Ordering[K]] = None,
6465
val aggregator: Option[Aggregator[K, V, C]] = None,
65-
val mapSideCombine: Boolean = false)
66+
val mapSideCombine: Boolean = false,
67+
val sortOrder: Option[SortOrder] = None)
6668
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
6769

6870
val shuffleId: Int = rdd.context.newShuffleId()

core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,13 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
5757
*/
5858
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
5959
val part = new RangePartitioner(numPartitions, self, ascending)
60-
val shuffled = new ShuffledRDD[K, V, V, P](self, part).setKeyOrdering(ordering)
61-
shuffled.mapPartitions(iter => {
62-
val buf = iter.toArray
63-
if (ascending) {
64-
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
65-
} else {
66-
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
67-
}
68-
}, preservesPartitioning = true)
60+
new ShuffledRDD[K, V, V, P](self, part)
61+
.setKeyOrdering(ordering)
62+
.setSortOrder(if (ascending) SortOrder.ASCENDING else SortOrder.DESCENDING)
6963
}
7064
}
65+
66+
private[spark] object SortOrder extends Enumeration {
67+
type SortOrder = Value
68+
val ASCENDING, DESCENDING = Value
69+
}

core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
2121

2222
import org.apache.spark._
2323
import org.apache.spark.annotation.DeveloperApi
24+
import org.apache.spark.rdd.SortOrder.SortOrder
2425
import org.apache.spark.serializer.Serializer
2526

2627
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -51,6 +52,8 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
5152

5253
private var mapSideCombine: Boolean = false
5354

55+
private var sortOrder: Option[SortOrder] = None
56+
5457
/** Set a serializer for this RDD's shuffle, or null to use the default (spark.serializer) */
5558
def setSerializer(serializer: Serializer): ShuffledRDD[K, V, C, P] = {
5659
this.serializer = Option(serializer)
@@ -75,8 +78,15 @@ class ShuffledRDD[K, V, C, P <: Product2[K, C] : ClassTag](
7578
this
7679
}
7780

81+
/** Set sort order for RDD's sorting. */
82+
def setSortOrder(sortOrder: SortOrder): ShuffledRDD[K, V, C, P] = {
83+
this.sortOrder = Option(sortOrder)
84+
this
85+
}
86+
7887
override def getDependencies: Seq[Dependency[_]] = {
79-
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
88+
List(new ShuffleDependency(prev, part, serializer,
89+
keyOrdering, aggregator, mapSideCombine, sortOrder))
8090
}
8191

8292
override val partitioner = Some(part)

core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.shuffle.hash
1919

2020
import org.apache.spark.{InterruptibleIterator, TaskContext}
21+
import org.apache.spark.rdd.SortOrder
2122
import org.apache.spark.serializer.Serializer
2223
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
2324

@@ -38,7 +39,7 @@ class HashShuffleReader[K, C](
3839
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context,
3940
Serializer.getSerializer(dep.serializer))
4041

41-
if (dep.aggregator.isDefined) {
42+
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
4243
if (dep.mapSideCombine) {
4344
new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
4445
} else {
@@ -49,6 +50,17 @@ class HashShuffleReader[K, C](
4950
} else {
5051
iter
5152
}
53+
54+
val sortedIter = for (sortOrder <- dep.sortOrder; ordering <- dep.keyOrdering) yield {
55+
val buf = aggregatedIter.toArray
56+
if (sortOrder == SortOrder.ASCENDING) {
57+
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
58+
} else {
59+
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
60+
}
61+
}
62+
63+
sortedIter.getOrElse(aggregatedIter)
5264
}
5365

5466
/** Close this reader */

0 commit comments

Comments
 (0)