@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
1919
2020import org .apache .spark .annotation .DeveloperApi
2121import org .apache .spark .shuffle .sort .SortShuffleManager
22- import org .apache .spark .{SparkEnv , HashPartitioner , RangePartitioner }
22+ import org .apache .spark .{HashPartitioner , Partitioner , RangePartitioner , SparkEnv }
2323import org .apache .spark .rdd .{RDD , ShuffledRDD }
2424import org .apache .spark .serializer .Serializer
2525import org .apache .spark .sql .{SQLContext , Row }
@@ -59,11 +59,62 @@ case class Exchange(
5959
6060 override def output : Seq [Attribute ] = child.output
6161
62- /** We must copy rows when sort based shuffle is on */
63- protected def sortBasedShuffleOn = SparkEnv .get.shuffleManager.isInstanceOf [SortShuffleManager ]
64-
65- private val bypassMergeThreshold =
66- child.sqlContext.sparkContext.conf.getInt(" spark.shuffle.sort.bypassMergeThreshold" , 200 )
62+ /**
63+ * Determines whether records must be defensively copied before being sent to the shuffle.
64+ * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
65+ * shuffle code assumes that objects are immutable and hence does not perform its own defensive
66+ * copying. In Spark SQL, however, operators' iterators return the same mutable `Row` object. In
67+ * order to properly shuffle the output of these operators, we need to perform our own copying
68+ * prior to sending records to the shuffle. This copying is expensive, so we try to avoid it
69+ * whenever possible. This method encapsulates the logic for choosing when to copy.
70+ *
71+ * In the long run, we might want to push this logic into core's shuffle APIs so that we don't
72+ * have to rely on knowledge of core internals here in SQL.
73+ *
74+ * See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
75+ *
76+ * @param partitioner the partitioner for the shuffle
77+ * @param serializer the serializer that will be used to write rows
78+ * @return true if rows should be copied before being shuffled, false otherwise
79+ */
80+ private def needToCopyObjectsBeforeShuffle (
81+ partitioner : Partitioner ,
82+ serializer : Serializer ): Boolean = {
83+ // Note: even though we only use the partitioner's `numPartitions` field, we require it to be
84+ // passed instead of directly passing the number of partitions in order to guard against
85+ // corner-cases where a partitioner constructed with `numPartitions` partitions may output
86+ // fewer partitions (like RangePartitioner, for example).
87+ val conf = child.sqlContext.sparkContext.conf
88+ val sortBasedShuffleOn = SparkEnv .get.shuffleManager.isInstanceOf [SortShuffleManager ]
89+ val bypassMergeThreshold = conf.getInt(" spark.shuffle.sort.bypassMergeThreshold" , 200 )
90+ val serializeMapOutputs = conf.getBoolean(" spark.shuffle.sort.serializeMapOutputs" , true )
91+ if (newOrdering.nonEmpty) {
92+ // If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
93+ // which requires a defensive copy.
94+ true
95+ } else if (sortBasedShuffleOn) {
96+ // Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory.
97+ // However, there are two special cases where we can avoid the copy, described below:
98+ if (partitioner.numPartitions <= bypassMergeThreshold) {
99+ // If the number of output partitions is sufficiently small, then Spark will fall back to
100+ // the old hash-based shuffle write path which doesn't buffer deserialized records.
101+ // Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
102+ false
103+ } else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) {
104+ // SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting
105+ // them. This optimization is guarded by a feature-flag and is only applied in cases where
106+ // shuffle dependency does not specify an ordering and the record serializer has certain
107+ // properties. If this optimization is enabled, we can safely avoid the copy.
108+ false
109+ } else {
110+ // None of the special cases held, so we must copy.
111+ true
112+ }
113+ } else {
114+ // We're using hash-based shuffle, so we don't need to copy.
115+ false
116+ }
117+ }
67118
68119 private val keyOrdering = {
69120 if (newOrdering.nonEmpty) {
@@ -81,7 +132,7 @@ case class Exchange(
81132
82133 @ transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
83134
84- def serializer (
135+ private def getSerializer (
85136 keySchema : Array [DataType ],
86137 valueSchema : Array [DataType ],
87138 hasKeyOrdering : Boolean ,
@@ -112,17 +163,12 @@ case class Exchange(
112163 protected override def doExecute (): RDD [Row ] = attachTree(this , " execute" ) {
113164 newPartitioning match {
114165 case HashPartitioning (expressions, numPartitions) =>
115- // TODO: Eliminate redundant expressions in grouping key and value.
116- // This is a workaround for SPARK-4479. When:
117- // 1. sort based shuffle is on, and
118- // 2. the partition number is under the merge threshold, and
119- // 3. no ordering is required
120- // we can avoid the defensive copies to improve performance. In the long run, we probably
121- // want to include information in shuffle dependencies to indicate whether elements in the
122- // source RDD should be copied.
123- val willMergeSort = sortBasedShuffleOn && numPartitions > bypassMergeThreshold
124-
125- val rdd = if (willMergeSort || newOrdering.nonEmpty) {
166+ val keySchema = expressions.map(_.dataType).toArray
167+ val valueSchema = child.output.map(_.dataType).toArray
168+ val serializer = getSerializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions)
169+ val part = new HashPartitioner (numPartitions)
170+
171+ val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
126172 child.execute().mapPartitions { iter =>
127173 val hashExpressions = newMutableProjection(expressions, child.output)()
128174 iter.map(r => (hashExpressions(r).copy(), r.copy()))
@@ -134,63 +180,61 @@ case class Exchange(
134180 iter.map(r => mutablePair.update(hashExpressions(r), r))
135181 }
136182 }
137- val part = new HashPartitioner (numPartitions)
138- val shuffled =
139- if (newOrdering.nonEmpty) {
140- new ShuffledRDD [Row , Row , Row ](rdd, part).setKeyOrdering(keyOrdering)
141- } else {
142- new ShuffledRDD [Row , Row , Row ](rdd, part)
143- }
144- val keySchema = expressions.map(_.dataType).toArray
145- val valueSchema = child.output.map(_.dataType).toArray
146- shuffled.setSerializer(
147- serializer(keySchema, valueSchema, newOrdering.nonEmpty, numPartitions))
148-
183+ val shuffled = new ShuffledRDD [Row , Row , Row ](rdd, part)
184+ if (newOrdering.nonEmpty) {
185+ shuffled.setKeyOrdering(keyOrdering)
186+ }
187+ shuffled.setSerializer(serializer)
149188 shuffled.map(_._2)
150189
151190 case RangePartitioning (sortingExpressions, numPartitions) =>
152- val rdd = if (sortBasedShuffleOn || newOrdering.nonEmpty) {
153- child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null ))}
154- } else {
155- child.execute().mapPartitions { iter =>
156- val mutablePair = new MutablePair [Row , Null ](null , null )
157- iter.map(row => mutablePair.update(row, null ))
191+ val keySchema = child.output.map(_.dataType).toArray
192+ val serializer = getSerializer(keySchema, null , newOrdering.nonEmpty, numPartitions)
193+
194+ val childRdd = child.execute()
195+ val part : Partitioner = {
196+ // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
197+ // partition bounds. To get accurate samples, we need to copy the mutable keys.
198+ val rddForSampling = childRdd.mapPartitions { iter =>
199+ val mutablePair = new MutablePair [Row , Null ]()
200+ iter.map(row => mutablePair.update(row.copy(), null ))
158201 }
202+ // TODO: RangePartitioner should take an Ordering.
203+ implicit val ordering = new RowOrdering (sortingExpressions, child.output)
204+ new RangePartitioner (numPartitions, rddForSampling, ascending = true )
159205 }
160206
161- // TODO: RangePartitioner should take an Ordering.
162- implicit val ordering = new RowOrdering (sortingExpressions, child.output)
163-
164- val part = new RangePartitioner (numPartitions, rdd, ascending = true )
165- val shuffled =
166- if (newOrdering.nonEmpty) {
167- new ShuffledRDD [Row , Null , Null ](rdd, part).setKeyOrdering(keyOrdering)
168- } else {
169- new ShuffledRDD [Row , Null , Null ](rdd, part)
207+ val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) {
208+ childRdd.mapPartitions { iter => iter.map(row => (row.copy(), null ))}
209+ } else {
210+ childRdd.mapPartitions { iter =>
211+ val mutablePair = new MutablePair [Row , Null ]()
212+ iter.map(row => mutablePair.update(row, null ))
170213 }
171- val keySchema = child.output.map(_.dataType).toArray
172- shuffled.setSerializer(
173- serializer(keySchema, null , newOrdering.nonEmpty, numPartitions))
214+ }
174215
216+ val shuffled = new ShuffledRDD [Row , Null , Null ](rdd, part)
217+ if (newOrdering.nonEmpty) {
218+ shuffled.setKeyOrdering(keyOrdering)
219+ }
220+ shuffled.setSerializer(serializer)
175221 shuffled.map(_._1)
176222
177223 case SinglePartition =>
178- // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since
179- // operators like `TakeOrdered` may require an ordering within the partition, and currently
180- // `SinglePartition` doesn't include ordering information.
181- // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered`
182- val rdd = if (sortBasedShuffleOn ) {
224+ val valueSchema = child.output.map(_.dataType).toArray
225+ val serializer = getSerializer( null , valueSchema, hasKeyOrdering = false , 1 )
226+ val partitioner = new HashPartitioner ( 1 )
227+
228+ val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer) ) {
183229 child.execute().mapPartitions { iter => iter.map(r => (null , r.copy())) }
184230 } else {
185231 child.execute().mapPartitions { iter =>
186232 val mutablePair = new MutablePair [Null , Row ]()
187233 iter.map(r => mutablePair.update(null , r))
188234 }
189235 }
190- val partitioner = new HashPartitioner (1 )
191236 val shuffled = new ShuffledRDD [Null , Row , Row ](rdd, partitioner)
192- val valueSchema = child.output.map(_.dataType).toArray
193- shuffled.setSerializer(serializer(null , valueSchema, false , 1 ))
237+ shuffled.setSerializer(serializer)
194238 shuffled.map(_._2)
195239
196240 case _ => sys.error(s " Exchange not implemented for $newPartitioning" )
0 commit comments