@@ -59,14 +59,6 @@ case class Exchange(
5959
6060 override def output : Seq [Attribute ] = child.output
6161
62- private val sortBasedShuffleOn = SparkEnv .get.shuffleManager.isInstanceOf [SortShuffleManager ]
63-
64- private val bypassMergeThreshold =
65- child.sqlContext.sparkContext.conf.getInt(" spark.shuffle.sort.bypassMergeThreshold" , 200 )
66-
67- private val serializeMapOutputs =
68- child.sqlContext.sparkContext.conf.getBoolean(" spark.shuffle.sort.serializeMapOutputs" , true )
69-
7062 /**
7163 * Determines whether records must be defensively copied before being sent to the shuffle.
7264 * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
@@ -91,7 +83,11 @@ case class Exchange(
9183 // Note: even though we only use the partitioner's `numPartitions` field, we require it to be
9284 // passed instead of directly passing the number of partitions in order to guard against
9385 // corner-cases where a partitioner constructed with `numPartitions` partitions may output
94- // fewer partitions (like RangeParittioner, for example).
86+ // fewer partitions (like RangePartitioner, for example).
87+ val conf = child.sqlContext.sparkContext.conf
88+ val sortBasedShuffleOn = SparkEnv .get.shuffleManager.isInstanceOf [SortShuffleManager ]
89+ val bypassMergeThreshold = conf.getInt(" spark.shuffle.sort.bypassMergeThreshold" , 200 )
90+ val serializeMapOutputs = conf.getBoolean(" spark.shuffle.sort.serializeMapOutputs" , true )
9591 if (newOrdering.nonEmpty) {
9692 // If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`,
9793 // which requires a defensive copy.
0 commit comments