Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,30 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
buildConf("spark.sql.adaptive.reducePostShufflePartitions.enabled")
.doc("When true and adaptive execution is enabled, this enables reducing the number of " +
"post-shuffle partitions based on map output statistics.")
.booleanConf
.createWithDefault(true)

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
.internal()
.doc("The advisory minimal number of post-shuffle partitions provided to " +
"ExchangeCoordinator. This setting is used in our test to make sure we " +
"have enough parallelism to expose issues that will not be exposed with a " +
"single partition. When the value is a non-positive value, this setting will " +
"not be provided to ExchangeCoordinator.")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.intConf
.createWithDefault(-1)
.checkValue(_ > 0, "The minimum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(1)

val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
"This is used as the initial number of pre-shuffle partitions. By default it equals to " +
"spark.sql.shuffle.partitions")
.intConf
.checkValue(_ > 0, "The maximum shuffle partition number " +
"must be a positive integer.")
.createOptional

val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
Expand Down Expand Up @@ -1916,9 +1930,14 @@ class SQLConf extends Serializable with Logging {
def adaptiveExecutionEnabled: Boolean =
getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED)

def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)

def maxNumPostShufflePartitions: Int =
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to set the default value here to something much higher (Int.Max?). The upper bound here is determined by the shuffles for which we are trying to reduce the number of partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually use this as the initial shuffle partition number, which will be set in the ShuffleExchangeExec. So this is expected to be a reasonable value, instead of Int.Max. This seems to be a little misleading if a user is not aware it is used as the initial shuffle partition number.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we have two confs for adaptive and non-adaptive execution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

People usually already have tuned spark.sql.shuffle.partitions for each workload they run periodically. The initial shuffle partition number for adaptive execution allows user to set a global value (relative large) that can work for all queries. If user doesn't set it, it will default to spark.sql.shuffle.partitions.


def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._
import org.apache.spark.sql.execution.adaptive.rule.ReduceNumShufflePartitions
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -82,6 +83,7 @@ case class AdaptiveSparkPlanExec(
// A list of physical optimizer rules to be applied to a new stage before its execution. These
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReduceNumShufflePartitions(conf),
CollapseCodegenStages(conf)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,21 @@ case class BroadcastQueryStageExec(
}
}

object ShuffleQueryStageExec {
/**
* Returns true if the plan is a [[ShuffleQueryStageExec]] or a reused [[ShuffleQueryStageExec]].
*/
def isShuffleQueryStageExec(plan: SparkPlan): Boolean = plan match {
case r: ReusedQueryStageExec => isShuffleQueryStageExec(r.plan)
case _: ShuffleQueryStageExec => true
case _ => false
}
}

object BroadcastQueryStageExec {
/**
* Returns if the plan is a [[BroadcastQueryStageExec]] or a reused [[BroadcastQueryStageExec]].
* Returns true if the plan is a [[BroadcastQueryStageExec]] or a reused
* [[BroadcastQueryStageExec]].
*/
def isBroadcastQueryStageExec(plan: SparkPlan): Boolean = plan match {
case r: ReusedQueryStageExec => isBroadcastQueryStageExec(r.plan)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.adaptive.rule

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.Duration

import org.apache.spark.MapOutputStatistics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.ThreadUtils

/**
* A rule to adjust the post shuffle partitions based on the map output statistics.
*
* The strategy used to determine the number of post-shuffle partitions is described as follows.
* To determine the number of post-shuffle partitions, we have a target input size for a
* post-shuffle partition. Once we have size statistics of all pre-shuffle partitions, we will do
* a pass of those statistics and pack pre-shuffle partitions with continuous indices to a single
* post-shuffle partition until adding another pre-shuffle partition would cause the size of a
* post-shuffle partition to be greater than the target size.
*
* For example, we have two stages with the following pre-shuffle partition size statistics:
* stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB]
* stage 2: [10 MiB, 10 MiB, 70 MiB, 5 MiB, 5 MiB]
* assuming the target input size is 128 MiB, we will have four post-shuffle partitions,
* which are:
* - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MiB)
* - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MiB)
* - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB)
* - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB)
*/
case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {

override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.reducePostShufflePartitionsEnabled) {
return plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without return, I'll need a else with more indentation. I saw a few rules write like this, so I was just copying that.

}
if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about first collecting all the leaves, then checking if they are all QueryStageExec nodes, and if they are collect the map stats outputs? That seems more efficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the leaf node is BroadcastQueryStageExec, Do we need to adjust the reduce number ?

Copy link
Contributor

@hvanhovell hvanhovell Jun 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are ignoring BroadcastQueryStageExec right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have situations with BHJ where:

  1. The BHJ is from compile-time and one side is a broadcast stage;
  2. The BHJ is from a former AQE optimization, then one side is shuffle stage and the other is a broadcast stage.

I believe scenario 2 may still be applicable here, so maybe we can change the condition to " isAllQueryStage && shuffleStageCount > 0"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously we don't want to adjust num shuffle partitions if there is no shuffle in this stage, so +1 to change the condition to isAllQueryStage && shuffleStageCount > 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we are safe here. After checking isAllQueryStage, we will get shuffle metrics from shuffle stages and we will only adjust num shuffle partitions if num shuffle metrics > 0.

// If not all leaf nodes are query stages, it's not safe to reduce the number of
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean the case like https://github.com/Intel-bigdata/spark-adaptive/pull/54 ?
:nit Maybe we can say If not all leaf nodes are query stages, for example when some depending stage is already bucketed, there isn't a shuffle or query stage for that branch to be clear because normally all leaves are QS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that case is covered. There are many cases the leaves are not query stage, for example in the stage that does a table scan, the leaf is a table scan operator. The comment seems to cover these.

// shuffle partitions, because we may break the assumption that all children of a spark plan
// have same number of output partitions.
plan
} else {
val shuffleStages = plan.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not work well with Union right? Is there a way we can identify subtrees in the stage where all the input partitions are the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, Union may have child stages with different partition number, see the unit test added. Currently we check the shuffle metrics to ensure the pre-shuffle partition number of the stages are the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can get pretty complicated if we have multiple Union and Join in one stage. For now I'm OK with this simple approach that we always require all shuffles in one stage have the same number of partitions. BTW this is an existing issue.

case stage: ShuffleQueryStageExec => stage
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
}
val shuffleMetrics = shuffleStages.map { stage =>
val metricsFuture = stage.mapOutputStatisticsFuture
assert(metricsFuture.isCompleted, "ShuffleQueryStageExec should already be ready")
ThreadUtils.awaitResult(metricsFuture, Duration.Zero)
}

// `ShuffleQueryStageExec` gives null mapOutputStatistics when the input RDD has 0 partitions,
// we should skip it when calculating the `partitionStartIndices`.
val validMetrics = shuffleMetrics.filter(_ != null)
// We may get different pre-shuffle partition number if user calls repartition manually.
// We don't reduce shuffle partition number in that case.
val distinctNumPreShufflePartitions =
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 conditions we don't do Adaptive Execution, so plan is directly returned.
Maybe better to aggregate these conditions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by aggregate ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind~


if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user calls val datasetAfter = datasetBefore.repartition(300), then he wants the post-shuffle partitionNum to be 300 exactly. But in this case we will go inside this branch, and then the partition merging on reduce side will break the user's expectation? cc @cloud-fan
Should we check the original operation, if it's repartition we don't do Adaptive Execution then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also discussed in the original PR. For df.repartition(500), AE does use the specified number for repartition. That is, each stage will writes the map output with 500 partitions. However, in the following up stage, AE may launch tasks less than 500 as one task can process multiple continues blocks. I think we can still do adaptive execution when it is possible. We can add a option to disable AE for repartition later if that is necessary.

Copy link

@gczsjdy gczsjdy Jul 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's pretty good to add a config entry, I think it's the best to make it internal(hidden). At the same time, we can add a function called repartitonWithAdvice, the different between it and repartition is whether the newly added config entry is set to true.

RepartitionWithAdvice means enabling AdaptiveExecution, to be different from the accurate repartition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have this problem in ExchangeCoordinator? At physical phase we have no idea if the shuffle comes from a df.repartition or is added by EnsureRequirements. We may need to add a boolean flag in ShuffleExchangeExec to indicate that this shuffle can't change its num partitions. Maybe we can fix it in a followup.

val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need add a check of whether all the pre-shuffle partitions is same before calling the estimatePartitionStartIndices method to avoid the assert exception in Line 130.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I'll add the check and the unit test.

// This transformation adds new nodes, so we must use `transformUp` here.
plan.transformUp {
// even for shuffle exchange whose input RDD has 0 partition, we should still update its
// `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
// number of output partitions.
case stage: QueryStageExec if ShuffleQueryStageExec.isShuffleQueryStageExec(stage) =>
CoalescedShuffleReaderExec(stage, partitionStartIndices)
}
} else {
plan
}
}
}

/**
* Estimates partition start indices for post-shuffle partitions based on
* mapOutputStatistics provided by all pre-shuffle stages.
*/
// visible for testing.
private[sql] def estimatePartitionStartIndices(
mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize
// If minNumPostShufflePartitions is defined, it is possible that we need to use a
// value less than advisoryTargetPostShuffleInputSize as the target input size of
// a post shuffle task.
val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
// The max at here is to make sure that when we have an empty table, we
// only have a single post-shuffle partition.
// There is no particular reason that we pick 16. We just need a number to
// prevent maxPostShuffleInputSize from being set to 0.
val maxPostShuffleInputSize = math.max(
math.ceil(totalPostShuffleInputSize / minNumPostShufflePartitions.toDouble).toLong, 16)
val targetPostShuffleInputSize =
math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)

logInfo(
s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
s"targetPostShuffleInputSize $targetPostShuffleInputSize.")

// Make sure we do get the same number of pre-shuffle partitions for those stages.
val distinctNumPreShufflePartitions =
mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
// The reason that we are expecting a single value of the number of pre-shuffle partitions
// is that when we add Exchanges, we set the number of pre-shuffle partitions
// (i.e. map output partitions) using a static setting, which is the value of
// spark.sql.shuffle.partitions. Even if two input RDDs are having different
// number of partitions, they will have the same number of pre-shuffle partitions
// (i.e. map output partitions).
assert(
distinctNumPreShufflePartitions.length == 1,
"There should be only one distinct value of the number pre-shuffle partitions " +
"among registered Exchange operator.")
val numPreShufflePartitions = distinctNumPreShufflePartitions.head

val partitionStartIndices = ArrayBuffer[Int]()
// The first element of partitionStartIndices is always 0.
partitionStartIndices += 0

var postShuffleInputSize = 0L

var i = 0
while (i < numPreShufflePartitions) {
// We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages.
// Then, we add the total size to postShuffleInputSize.
var nextShuffleInputSize = 0L
var j = 0
while (j < mapOutputStatistics.length) {
nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
j += 1
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    var nextShuffleInputSize = 0L
      var j = 0
      while (j < mapOutputStatistics.length) {
        nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
        j += 1
      }

=>
val nextShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId(i)).sum

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was original there in ExchangeCoordinator. Not sure if it is in purpose to avoid creating many additional arrays when we write as you proposed. So let's leave it as is.

// If including the nextShuffleInputSize would exceed the target partition size, then start a
// new partition.
if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) {
partitionStartIndices += i
// reset postShuffleInputSize.
postShuffleInputSize = nextShuffleInputSize
} else {
postShuffleInputSize += nextShuffleInputSize
}

i += 1
}

partitionStartIndices.toArray
}
}

case class CoalescedShuffleReaderExec(
child: QueryStageExec,
partitionStartIndices: Array[Int]) extends UnaryExecNode {

override def output: Seq[Attribute] = child.output

override def doCanonicalize(): SparkPlan = child.canonicalized

override def outputPartitioning: Partitioning = {
UnknownPartitioning(partitionStartIndices.length)
}

private var cachedShuffleRDD: ShuffledRowRDD = null

override protected def doExecute(): RDD[InternalRow] = {
if (cachedShuffleRDD == null) {
cachedShuffleRDD = child match {
case stage: ShuffleQueryStageExec =>
stage.plan.createShuffledRDD(Some(partitionStartIndices))
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
stage.plan.createShuffledRDD(Some(partitionStartIndices))
}
}
cachedShuffleRDD
}
}
Loading