Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,9 @@ object ShuffleExchangeExec {
* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
*
* @param partitioner the partitioner for the shuffle
* @param serializer the serializer that will be used to write rows
* @return true if rows should be copied before being shuffled, false otherwise
*/
private def needToCopyObjectsBeforeShuffle(
partitioner: Partitioner,
serializer: Serializer): Boolean = {
private def needToCopyObjectsBeforeShuffle(partitioner: Partitioner): Boolean = {
// Note: even though we only use the partitioner's `numPartitions` field, we require it to be
// passed instead of directly passing the number of partitions in order to guard against
// corner-cases where a partitioner constructed with `numPartitions` partitions may output
Expand All @@ -167,22 +164,24 @@ object ShuffleExchangeExec {
val shuffleManager = SparkEnv.get.shuffleManager
val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
val numParts = partitioner.numPartitions
if (sortBasedShuffleOn) {
val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
if (numParts <= bypassMergeThreshold) {
// If we're using the original SortShuffleManager and the number of output partitions is
// sufficiently small, then Spark will fall back to the hash-based shuffle write path, which
// doesn't buffer deserialized records.
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
false
} else if (serializer.supportsRelocationOfSerializedObjects) {
} else if (numParts <= SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
Copy link
Contributor

@JoshRosen JoshRosen Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was almost going to suggest that we should we check for both conditions with an && here just as future-proofing in case serializer was changed, but I can now see why that isn't a huge risk in the current codebase: we always use an UnsafeRowSerializer here now. It was only in the pre-Tungsten era that we could use either UnsafeRowSerializer or SparkSqlSerializer here.

// SPARK-4550 and SPARK-7081 extended sort-based shuffle to serialize individual records
// prior to sorting them. This optimization is only applied in cases where shuffle
// dependency does not specify an aggregator or ordering and the record serializer has
// certain properties. If this optimization is enabled, we can safely avoid the copy.
// certain properties and the number of partitions doesn't exceed the limitation. If this
// optimization is enabled, we can safely avoid the copy.
//
// Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only
// need to check whether the optimization is enabled and supported by our serializer.
// Exchange never configures its ShuffledRDDs with aggregators or key orderings, and the
// serializer in Spark SQL always satisfy the properties, so we only need to check whether
// the number of partitions exceeds the limitation.
false
} else {
// Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory, so we must
Expand Down Expand Up @@ -298,7 +297,7 @@ object ShuffleExchangeExec {
rdd
}

if (needToCopyObjectsBeforeShuffle(part, serializer)) {
if (needToCopyObjectsBeforeShuffle(part)) {
newRdd.mapPartitionsInternal { iter =>
val getPartitionKey = getPartitionKeyExtractor()
iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }
Expand Down