diff --git a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala index c02e48c9815f..f8a6f1d0d8cb 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala @@ -25,4 +25,3 @@ package org.apache.spark * (may be inexact due to use of compressed map statuses) */ private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long]) - extends Serializable diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0724c4acf664..2e44ca5315c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -282,19 +282,14 @@ object SQLConf { val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = buildConf("spark.sql.adaptive.minNumPostShufflePartitions") - .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.") - .intConf - .checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " + - "must be a positive integer.") - .createWithDefault(1) - - val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS = - buildConf("spark.sql.adaptive.maxNumPostShufflePartitions") - .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.") + .internal() + .doc("The advisory minimal number of post-shuffle partitions provided to " + + "ExchangeCoordinator. This setting is used in our test to make sure we " + + "have enough parallelism to expose issues that will not be exposed with a " + + "single partition. When the value is a non-positive value, this setting will " + + "not be provided to ExchangeCoordinator.") .intConf - .checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " + - "must be a positive integer.") - .createWithDefault(500) + .createWithDefault(-1) val SUBEXPRESSION_ELIMINATION_ENABLED = buildConf("spark.sql.subexpressionElimination.enabled") @@ -1783,9 +1778,8 @@ class SQLConf extends Serializable with Logging { def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) - def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) - - def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS) + def minNumPostShufflePartitions: Int = + getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index b766dcb8a38b..427eb85730be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.adaptive.PlanQueryStage import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.internal.SQLConf @@ -95,11 +95,7 @@ class QueryExecution( * row format conversions as needed. */ protected def prepareForExecution(plan: SparkPlan): SparkPlan = { - if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) { - adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)} - } else { - preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)} - } + preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } } /** A sequence of rules that will be applied in order to the physical plan before execution. */ @@ -110,15 +106,6 @@ class QueryExecution( ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf)) - protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq( - PlanSubqueries(sparkSession), - EnsureRequirements(sparkSession.sessionState.conf), - ReuseSubquery(sparkSession.sessionState.conf), - // PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees - // by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will - // only transform node in a sub-tree. - PlanQueryStage(sparkSession.sessionState.conf)) - protected def stringOrError[A](f: => A): String = try f.toString catch { case e: AnalysisException => e.toString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index ca46b1e940e4..f554ff0aa775 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.adaptive.QueryStageInput import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo import org.apache.spark.sql.internal.SQLConf @@ -53,7 +52,6 @@ private[execution] object SparkPlanInfo { def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { val children = plan match { case ReusedExchangeExec(_, child) => child :: Nil - case i: QueryStageInput => i.childStage :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala deleted file mode 100644 index ab2b6e9dfdec..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.ExecutedCommandExec -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType - -/** - * Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it adds a - * QueryStage and a QueryStageInput. If reusing Exchange is enabled, it finds duplicated exchanges - * and uses the same QueryStage for all the references. - */ -case class PlanQueryStage(conf: SQLConf) extends Rule[SparkPlan] { - - def apply(plan: SparkPlan): SparkPlan = { - - val newPlan = if (!conf.exchangeReuseEnabled) { - plan.transformUp { - case e: ShuffleExchangeExec => - ShuffleQueryStageInput(ShuffleQueryStage(e), e.output) - case e: BroadcastExchangeExec => - BroadcastQueryStageInput(BroadcastQueryStage(e), e.output) - } - } else { - // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. - val stages = mutable.HashMap[StructType, ArrayBuffer[QueryStage]]() - - plan.transformUp { - case exchange: Exchange => - val sameSchema = stages.getOrElseUpdate(exchange.schema, ArrayBuffer[QueryStage]()) - val samePlan = sameSchema.find { s => - exchange.sameResult(s.child) - } - if (samePlan.isDefined) { - // Keep the output of this exchange, the following plans require that to resolve - // attributes. - exchange match { - case e: ShuffleExchangeExec => ShuffleQueryStageInput( - samePlan.get.asInstanceOf[ShuffleQueryStage], exchange.output) - case e: BroadcastExchangeExec => BroadcastQueryStageInput( - samePlan.get.asInstanceOf[BroadcastQueryStage], exchange.output) - } - } else { - val queryStageInput = exchange match { - case e: ShuffleExchangeExec => - ShuffleQueryStageInput(ShuffleQueryStage(e), e.output) - case e: BroadcastExchangeExec => - BroadcastQueryStageInput(BroadcastQueryStage(e), e.output) - } - sameSchema += queryStageInput.childStage - queryStageInput - } - } - } - ResultQueryStage(newPlan) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala deleted file mode 100644 index 85801cc5cd9c..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import java.util.Properties - -import scala.concurrent.{ExecutionContext, Future} -import scala.concurrent.duration.Duration - -import org.apache.spark.{broadcast, MapOutputStatistics, SparkContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.exchange._ -import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate -import org.apache.spark.util.ThreadUtils - -/** - * In adaptive execution mode, an execution plan is divided into multiple QueryStages. Each - * QueryStage is a sub-tree that runs in a single stage. - */ -abstract class QueryStage extends UnaryExecNode { - - var child: SparkPlan - - // Ignore this wrapper for canonicalizing. - override def doCanonicalize(): SparkPlan = child.canonicalized - - override def output: Seq[Attribute] = child.output - - override def outputPartitioning: Partitioning = child.outputPartitioning - - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - - def withLocalProperties[T](sc: SparkContext, properties: Properties)(body: => T): T = { - val oldProperties = sc.getLocalProperties - try { - sc.setLocalProperties(properties) - body - } finally { - sc.setLocalProperties(oldProperties) - } - } - - /** - * Execute childStages and wait until all stages are completed. Use a thread pool to avoid - * blocking on one child stage. - */ - def executeChildStages(): Unit = { - val localProperties = sqlContext.sparkContext.getLocalProperties - - // Handle broadcast stages - val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect { - case bqs: BroadcastQueryStageInput => bqs.childStage - } - val broadcastFutures = broadcastQueryStages.map { queryStage => - Future { - withLocalProperties(sqlContext.sparkContext, localProperties) { - queryStage.prepareBroadcast() - } - }(QueryStage.executionContext) - } - - // Submit shuffle stages - val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect { - case sqs: ShuffleQueryStageInput => sqs.childStage - } - val shuffleStageFutures = shuffleQueryStages.map { queryStage => - Future { - withLocalProperties(sqlContext.sparkContext, localProperties) { - queryStage.execute() - } - }(QueryStage.executionContext) - } - - ThreadUtils.awaitResult( - Future.sequence(broadcastFutures)(implicitly, QueryStage.executionContext), Duration.Inf) - ThreadUtils.awaitResult( - Future.sequence(shuffleStageFutures)(implicitly, QueryStage.executionContext), Duration.Inf) - } - - /** - * Before executing the plan in this query stage, we execute all child stages, optimize the plan - * in this stage and determine the reducer number based on the child stages' statistics. Finally - * we do a codegen for this query stage and update the UI with the new plan. - */ - def prepareExecuteStage(): Unit = { - // 1. Execute childStages - executeChildStages() - // It is possible to optimize this stage's plan here based on the child stages' statistics. - - // 2. Determine reducer number - val queryStageInputs: Seq[ShuffleQueryStageInput] = child.collect { - case input: ShuffleQueryStageInput => input - } - val childMapOutputStatistics = queryStageInputs.map(_.childStage.mapOutputStatistics) - .filter(_ != null).toArray - if (childMapOutputStatistics.length > 0) { - val exchangeCoordinator = new ExchangeCoordinator( - conf.targetPostShuffleInputSize, - conf.minNumPostShufflePartitions) - - val partitionStartIndices = - exchangeCoordinator.estimatePartitionStartIndices(childMapOutputStatistics) - child = child.transform { - case ShuffleQueryStageInput(childStage, output, _) => - ShuffleQueryStageInput(childStage, output, Some(partitionStartIndices)) - } - } - - // 3. Codegen and update the UI - child = CollapseCodegenStages(sqlContext.conf).apply(child) - val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - if (executionId != null && executionId.nonEmpty) { - val queryExecution = SQLExecution.getQueryExecution(executionId.toLong) - sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate( - executionId.toLong, - queryExecution.toString, - SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan))) - } - } - - // Caches the created ShuffleRowRDD so we can reuse that. - private var cachedRDD: RDD[InternalRow] = null - - def executeStage(): RDD[InternalRow] = child.execute() - - /** - * A QueryStage can be reused like Exchange. It is possible that multiple threads try to submit - * the same QueryStage. Use synchronized to make sure it is executed only once. - */ - override def doExecute(): RDD[InternalRow] = synchronized { - if (cachedRDD == null) { - prepareExecuteStage() - cachedRDD = executeStage() - } - cachedRDD - } - - override def executeCollect(): Array[InternalRow] = { - prepareExecuteStage() - child.executeCollect() - } - - override def executeToIterator(): Iterator[InternalRow] = { - prepareExecuteStage() - child.executeToIterator() - } - - override def executeTake(n: Int): Array[InternalRow] = { - prepareExecuteStage() - child.executeTake(n) - } - - override def generateTreeString( - depth: Int, - lastChildren: Seq[Boolean], - append: String => Unit, - verbose: Boolean, - prefix: String = "", - addSuffix: Boolean = false, - maxFields: Int): Unit = { - child.generateTreeString(depth, lastChildren, append, verbose, "*", addSuffix, maxFields) - } -} - -/** - * The last QueryStage of an execution plan. - */ -case class ResultQueryStage(var child: SparkPlan) extends QueryStage - -/** - * A shuffle QueryStage whose child is a ShuffleExchange. - */ -case class ShuffleQueryStage(var child: SparkPlan) extends QueryStage { - - protected var _mapOutputStatistics: MapOutputStatistics = null - - def mapOutputStatistics: MapOutputStatistics = _mapOutputStatistics - - override def executeStage(): RDD[InternalRow] = { - child match { - case e: ShuffleExchangeExec => - val result = e.eagerExecute() - _mapOutputStatistics = e.mapOutputStatistics - result - case _ => throw new IllegalArgumentException( - "The child of ShuffleQueryStage must be a ShuffleExchange.") - } - } -} - -/** - * A broadcast QueryStage whose child is a BroadcastExchangeExec. - */ -case class BroadcastQueryStage(var child: SparkPlan) extends QueryStage { - override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - child.executeBroadcast() - } - - private var prepared = false - - def prepareBroadcast() : Unit = synchronized { - if (!prepared) { - executeChildStages() - child = CollapseCodegenStages(sqlContext.conf).apply(child) - // After child stages are completed, prepare() triggers the broadcast. - prepare() - prepared = true - } - } - - override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException( - "BroadcastExchange does not support the execute() code path.") - } -} - -object QueryStage { - private[execution] val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("adaptive-query-stage")) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageInput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageInput.scala deleted file mode 100644 index 2ed9e3f3abce..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageInput.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import org.apache.spark.broadcast -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} - -/** - * QueryStageInput is the leaf node of a QueryStage and is used to hide its child stage. It gets - * the result of its child stage and serves it as the input of the QueryStage. A QueryStage knows - * its child stages by collecting all the QueryStageInputs. - */ -abstract class QueryStageInput extends LeafExecNode { - - def childStage: QueryStage - - // Ignore this wrapper for canonicalizing. - override def doCanonicalize(): SparkPlan = childStage.canonicalized - - // Similar to ReusedExchangeExec, two QueryStageInputs can reference to the same childStage. - // QueryStageInput can have distinct set of output attribute ids from its childStage, we need - // to update the attribute ids in outputPartitioning and outputOrdering. - private lazy val updateAttr: Expression => Expression = { - val originalAttrToNewAttr = AttributeMap(childStage.output.zip(output)) - e => e.transform { - case attr: Attribute => originalAttrToNewAttr.getOrElse(attr, attr) - } - } - - override def outputPartitioning: Partitioning = childStage.outputPartitioning match { - case h: HashPartitioning => h.copy(expressions = h.expressions.map(updateAttr)) - case other => other - } - - override def outputOrdering: Seq[SortOrder] = { - childStage.outputOrdering.map(updateAttr(_).asInstanceOf[SortOrder]) - } - - override def generateTreeString( - depth: Int, - lastChildren: Seq[Boolean], - append: String => Unit, - verbose: Boolean, - prefix: String = "", - addSuffix: Boolean = false, - maxFields: Int): Unit = { - childStage.generateTreeString(depth, lastChildren, append, verbose, "*", addSuffix, maxFields) - } -} - -/** - * A QueryStageInput whose child stage is a ShuffleQueryStage. It returns a new ShuffledRowRDD - * based on the the child stage's result RDD and the specified partitionStartIndices. If the - * child stage is reused by another ShuffleQueryStageInput, they can return RDDs with different - * partitionStartIndices. - */ -case class ShuffleQueryStageInput( - childStage: ShuffleQueryStage, - override val output: Seq[Attribute], - partitionStartIndices: Option[Array[Int]] = None) - extends QueryStageInput { - - private lazy val writeMetrics = - SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) - private lazy val readMetrics = - SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) - override lazy val metrics = readMetrics ++ writeMetrics - - override def outputPartitioning: Partitioning = partitionStartIndices.map { - indices => UnknownPartitioning(indices.length) - }.getOrElse(super.outputPartitioning) - - override def doExecute(): RDD[InternalRow] = { - val childRDD = childStage.execute().asInstanceOf[ShuffledRowRDD] - new ShuffledRowRDD(childRDD.dependency, readMetrics, partitionStartIndices) - } -} - -/** A QueryStageInput whose child stage is a BroadcastQueryStage. */ -case class BroadcastQueryStageInput( - childStage: BroadcastQueryStage, - override val output: Seq[Attribute]) - extends QueryStageInput { - - override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - childStage.executeBroadcast() - } - - override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException( - "BroadcastStageInput does not support the execute() code path.") - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 8184baf50b04..d2d5011bbcb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -36,12 +36,107 @@ import org.apache.spark.sql.internal.SQLConf * the input partition ordering requirements are met. */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { - private def defaultNumPreShufflePartitions: Int = - if (conf.adaptiveExecutionEnabled) { - conf.maxNumPostShufflePartitions - } else { - conf.numShufflePartitions - } + private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions + + private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize + + private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled + + private def minNumPostShufflePartitions: Option[Int] = { + val minNumPostShufflePartitions = conf.minNumPostShufflePartitions + if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) else None + } + + /** + * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled + * and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]]. + */ + private def withExchangeCoordinator( + children: Seq[SparkPlan], + requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = { + val supportsCoordinator = + if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) { + // Right now, ExchangeCoordinator only support HashPartitionings. + children.forall { + case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true + case child => + child.outputPartitioning match { + case hash: HashPartitioning => true + case collection: PartitioningCollection => + collection.partitionings.forall(_.isInstanceOf[HashPartitioning]) + case _ => false + } + } + } else { + // In this case, although we do not have Exchange operators, we may still need to + // shuffle data when we have more than one children because data generated by + // these children may not be partitioned in the same way. + // Please see the comment in withCoordinator for more details. + val supportsDistribution = requiredChildDistributions.forall { dist => + dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution] + } + children.length > 1 && supportsDistribution + } + + val withCoordinator = + if (adaptiveExecutionEnabled && supportsCoordinator) { + val coordinator = + new ExchangeCoordinator( + targetPostShuffleInputSize, + minNumPostShufflePartitions) + children.zip(requiredChildDistributions).map { + case (e: ShuffleExchangeExec, _) => + // This child is an Exchange, we need to add the coordinator. + e.copy(coordinator = Some(coordinator)) + case (child, distribution) => + // If this child is not an Exchange, we need to add an Exchange for now. + // Ideally, we can try to avoid this Exchange. However, when we reach here, + // there are at least two children operators (because if there is a single child + // and we can avoid Exchange, supportsCoordinator will be false and we + // will not reach here.). Although we can make two children have the same number of + // post-shuffle partitions. Their numbers of pre-shuffle partitions may be different. + // For example, let's say we have the following plan + // Join + // / \ + // Agg Exchange + // / \ + // Exchange t2 + // / + // t1 + // In this case, because a post-shuffle partition can include multiple pre-shuffle + // partitions, a HashPartitioning will not be strictly partitioned by the hashcodes + // after shuffle. So, even we can use the child Exchange operator of the Join to + // have a number of post-shuffle partitions that matches the number of partitions of + // Agg, we cannot say these two children are partitioned in the same way. + // Here is another case + // Join + // / \ + // Agg1 Agg2 + // / \ + // Exchange1 Exchange2 + // / \ + // t1 t2 + // In this case, two Aggs shuffle data with the same column of the join condition. + // After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same + // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2 + // post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle + // partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its + // pre-shuffle partitions by using another partitionStartIndices [0, 4]. + // So, Agg1 and Agg2 are actually not co-partitioned. + // + // It will be great to introduce a new Partitioning to represent the post-shuffle + // partitions when one post-shuffle partition includes multiple pre-shuffle partitions. + val targetPartitioning = distribution.createPartitioning(defaultNumPreShufflePartitions) + assert(targetPartitioning.isInstanceOf[HashPartitioning]) + ShuffleExchangeExec(targetPartitioning, child, Some(coordinator)) + } + } else { + // If we do not need ExchangeCoordinator, the original children are returned. + children + } + + withCoordinator + } private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution @@ -94,7 +189,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { val defaultPartitioning = distribution.createPartitioning(targetNumPartitions) child match { // If child is an exchange, we replace it with a new one having defaultPartitioning. - case ShuffleExchangeExec(_, c) => ShuffleExchangeExec(defaultPartitioning, c) + case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c) case _ => ShuffleExchangeExec(defaultPartitioning, child) } } @@ -103,6 +198,15 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { } } + // Now, we need to add ExchangeCoordinator if necessary. + // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges. + // However, with the way that we plan the query, we do not have a place where we have a + // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator + // at here for now. + // Once we finish https://issues.apache.org/jira/browse/SPARK-10665, + // we can first add Exchanges and then add coordinator once we have a DAG of query fragments. + children = withExchangeCoordinator(children, requiredChildDistributions) + // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings: children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort. @@ -191,7 +295,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = plan.transformUp { // TODO: remove this after we create a physical operator for `RepartitionByExpression`. - case operator @ ShuffleExchangeExec(upper: HashPartitioning, child) => + case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) => child.outputPartitioning match { case lower: HashPartitioning if upper.semanticEquals(lower) => child case _ => operator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala index 5a44ec85d5a1..e4ec76f0b9a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala @@ -17,31 +17,60 @@ package org.apache.spark.sql.execution.exchange +import java.util.{HashMap => JHashMap, Map => JMap} +import javax.annotation.concurrent.GuardedBy + import scala.collection.mutable.ArrayBuffer -import org.apache.spark.MapOutputStatistics +import org.apache.spark.{MapOutputStatistics, ShuffleDependency, SimpleFutureAction} import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} /** * A coordinator used to determines how we shuffle data between stages generated by Spark SQL. * Right now, the work of this coordinator is to determine the number of post-shuffle partitions * for a stage that needs to fetch shuffle data from one or multiple stages. * - * A coordinator is constructed with two parameters, `targetPostShuffleInputSize`, - * and `minNumPostShufflePartitions`. + * A coordinator is constructed with three parameters, `numExchanges`, + * `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`. + * - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec]]s that will be + * registered to this coordinator. So, when we start to do any actual work, we have a way to + * make sure that we have got expected number of [[ShuffleExchangeExec]]s. * - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's * input data size. With this parameter, we can estimate the number of post-shuffle partitions. * This parameter is configured through * `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`. - * - `minNumPostShufflePartitions` is used to make sure that there are at least - * `minNumPostShufflePartitions` post-shuffle partitions. + * - `minNumPostShufflePartitions` is an optional parameter. If it is defined, this coordinator + * will try to make sure that there are at least `minNumPostShufflePartitions` post-shuffle + * partitions. + * + * The workflow of this coordinator is described as follows: + * - Before the execution of a [[SparkPlan]], for a [[ShuffleExchangeExec]] operator, + * if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator. + * This happens in the `doPrepare` method. + * - Once we start to execute a physical plan, a [[ShuffleExchangeExec]] registered to this + * coordinator will call `postShuffleRDD` to get its corresponding post-shuffle + * [[ShuffledRowRDD]]. + * If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec]] + * will immediately get its corresponding post-shuffle [[ShuffledRowRDD]]. + * - If this coordinator has not made the decision on how to shuffle data, it will ask those + * registered [[ShuffleExchangeExec]]s to submit their pre-shuffle stages. Then, based on the + * size statistics of pre-shuffle partitions, this coordinator will determine the number of + * post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices + * to a single post-shuffle partition whenever necessary. + * - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered + * [[ShuffleExchangeExec]]s. So, when a [[ShuffleExchangeExec]] calls `postShuffleRDD`, this + * coordinator can lookup the corresponding [[RDD]]. * * The strategy used to determine the number of post-shuffle partitions is described as follows. * To determine the number of post-shuffle partitions, we have a target input size for a - * post-shuffle partition. Once we have size statistics of all pre-shuffle partitions, we will do - * a pass of those statistics and pack pre-shuffle partitions with continuous indices to a single - * post-shuffle partition until adding another pre-shuffle partition would cause the size of a - * post-shuffle partition to be greater than the target size. + * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages + * corresponding to the registered [[ShuffleExchangeExec]]s, we will do a pass of those statistics + * and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until + * adding another pre-shuffle partition would cause the size of a post-shuffle partition to be + * greater than the target size. * * For example, we have two stages with the following pre-shuffle partition size statistics: * stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB] @@ -55,28 +84,60 @@ import org.apache.spark.internal.Logging */ class ExchangeCoordinator( advisoryTargetPostShuffleInputSize: Long, - minNumPostShufflePartitions: Int = 1) + minNumPostShufflePartitions: Option[Int] = None) extends Logging { + // The registered Exchange operators. + private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]() + + // `lazy val` is used here so that we could notice the wrong use of this class, e.g., all the + // exchanges should be registered before `postShuffleRDD` called first time. If a new exchange is + // registered after the `postShuffleRDD` call, `assert(exchanges.length == numExchanges)` fails + // in `doEstimationIfNecessary`. + private[this] lazy val numExchanges = exchanges.size + + // This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator. + private[this] lazy val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] = + new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges) + + // A boolean that indicates if this coordinator has made decision on how to shuffle data. + // This variable will only be updated by doEstimationIfNecessary, which is protected by + // synchronized. + @volatile private[this] var estimated: Boolean = false + + /** + * Registers a [[ShuffleExchangeExec]] operator to this coordinator. This method is only allowed + * to be called in the `doPrepare` method of a [[ShuffleExchangeExec]] operator. + */ + @GuardedBy("this") + def registerExchange(exchange: ShuffleExchangeExec): Unit = synchronized { + exchanges += exchange + } + + def isEstimated: Boolean = estimated + /** * Estimates partition start indices for post-shuffle partitions based on * mapOutputStatistics provided by all pre-shuffle stages. */ def estimatePartitionStartIndices( mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = { - // If minNumPostShufflePartitions is defined, it is possible that we need to use a // value less than advisoryTargetPostShuffleInputSize as the target input size of // a post shuffle task. - val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum - // The max at here is to make sure that when we have an empty table, we - // only have a single post-shuffle partition. - // There is no particular reason that we pick 16. We just need a number to - // prevent maxPostShuffleInputSize from being set to 0. - val maxPostShuffleInputSize = math.max( - math.ceil(totalPostShuffleInputSize / minNumPostShufflePartitions.toDouble).toLong, 16) - val targetPostShuffleInputSize = - math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize) + val targetPostShuffleInputSize = minNumPostShufflePartitions match { + case Some(numPartitions) => + val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum + // The max at here is to make sure that when we have an empty table, we + // only have a single post-shuffle partition. + // There is no particular reason that we pick 16. We just need a number to + // prevent maxPostShuffleInputSize from being set to 0. + val maxPostShuffleInputSize = + math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16) + math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize) + + case None => advisoryTargetPostShuffleInputSize + } logInfo( s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " + @@ -128,6 +189,88 @@ class ExchangeCoordinator( partitionStartIndices.toArray } + @GuardedBy("this") + private def doEstimationIfNecessary(): Unit = synchronized { + // It is unlikely that this method will be called from multiple threads + // (when multiple threads trigger the execution of THIS physical) + // because in common use cases, we will create new physical plan after + // users apply operations (e.g. projection) to an existing DataFrame. + // However, if it happens, we have synchronized to make sure only one + // thread will trigger the job submission. + if (!estimated) { + // Make sure we have the expected number of registered Exchange operators. + assert(exchanges.length == numExchanges) + + val newPostShuffleRDDs = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges) + + // Submit all map stages + val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]() + val submittedStageFutures = ArrayBuffer[SimpleFutureAction[MapOutputStatistics]]() + var i = 0 + while (i < numExchanges) { + val exchange = exchanges(i) + val shuffleDependency = exchange.prepareShuffleDependency() + shuffleDependencies += shuffleDependency + if (shuffleDependency.rdd.partitions.length != 0) { + // submitMapStage does not accept RDD with 0 partition. + // So, we will not submit this dependency. + submittedStageFutures += + exchange.sqlContext.sparkContext.submitMapStage(shuffleDependency) + } + i += 1 + } + + // Wait for the finishes of those submitted map stages. + val mapOutputStatistics = new Array[MapOutputStatistics](submittedStageFutures.length) + var j = 0 + while (j < submittedStageFutures.length) { + // This call is a blocking call. If the stage has not finished, we will wait at here. + mapOutputStatistics(j) = submittedStageFutures(j).get() + j += 1 + } + + // If we have mapOutputStatistics.length < numExchange, it is because we do not submit + // a stage when the number of partitions of this dependency is 0. + assert(mapOutputStatistics.length <= numExchanges) + + // Now, we estimate partitionStartIndices. partitionStartIndices.length will be the + // number of post-shuffle partitions. + val partitionStartIndices = + if (mapOutputStatistics.length == 0) { + Array.empty[Int] + } else { + estimatePartitionStartIndices(mapOutputStatistics) + } + + var k = 0 + while (k < numExchanges) { + val exchange = exchanges(k) + val rdd = + exchange.preparePostShuffleRDD(shuffleDependencies(k), Some(partitionStartIndices)) + newPostShuffleRDDs.put(exchange, rdd) + + k += 1 + } + + // Finally, we set postShuffleRDDs and estimated. + assert(postShuffleRDDs.isEmpty) + assert(newPostShuffleRDDs.size() == numExchanges) + postShuffleRDDs.putAll(newPostShuffleRDDs) + estimated = true + } + } + + def postShuffleRDD(exchange: ShuffleExchangeExec): ShuffledRowRDD = { + doEstimationIfNecessary() + + if (!postShuffleRDDs.containsKey(exchange)) { + throw new IllegalStateException( + s"The given $exchange is not registered in this coordinator.") + } + + postShuffleRDDs.get(exchange) + } + override def toString: String = { s"coordinator[target post-shuffle partition size: $advisoryTargetPostShuffleInputSize]" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index a72ee4f4fb37..16398e34bdeb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -43,7 +43,8 @@ import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordCo */ case class ShuffleExchangeExec( var newPartitioning: Partitioning, - child: SparkPlan) extends Exchange { + child: SparkPlan, + @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { // NOTE: coordinator can be null after serialization/deserialization, // e.g. it can be null on the Executor side @@ -56,7 +57,14 @@ case class ShuffleExchangeExec( ) ++ readMetrics ++ writeMetrics override def nodeName: String = { - "Exchange" + val extraInfo = coordinator match { + case Some(exchangeCoordinator) => + s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})" + case _ => "" + } + + val simpleNodeName = "Exchange" + s"$simpleNodeName$extraInfo" } override def outputPartitioning: Partitioning = newPartitioning @@ -64,6 +72,21 @@ case class ShuffleExchangeExec( private val serializer: Serializer = new UnsafeRowSerializer(child.output.size, longMetric("dataSize")) + override protected def doPrepare(): Unit = { + // If an ExchangeCoordinator is needed, we register this Exchange operator + // to the coordinator when we do prepare. It is important to make sure + // we register this operator right before the execution instead of register it + // in the constructor because it is possible that we create new instances of + // Exchange operators when we transform the physical plan + // (then the ExchangeCoordinator will hold references of unneeded Exchanges). + // So, we should only call registerExchange just before we start to execute + // the plan. + coordinator match { + case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this) + case _ => + } + } + /** * Returns a [[ShuffleDependency]] that will partition rows of its child based on * the partitioning scheme defined in `newPartitioning`. Those partitions of @@ -106,32 +129,25 @@ case class ShuffleExchangeExec( protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { // Returns the same ShuffleRowRDD if this plan is used by multiple plans. if (cachedShuffleRDD == null) { - val shuffleDependency = prepareShuffleDependency() - cachedShuffleRDD = preparePostShuffleRDD(shuffleDependency) - } - cachedShuffleRDD - } - - private var _mapOutputStatistics: MapOutputStatistics = null - - def mapOutputStatistics: MapOutputStatistics = _mapOutputStatistics - - def eagerExecute(): RDD[InternalRow] = { - if (cachedShuffleRDD == null) { - val shuffleDependency = prepareShuffleDependency() - if (shuffleDependency.rdd.partitions.length != 0) { - // submitMapStage does not accept RDD with 0 partition. - // So, we will not submit this dependency. - val submittedStageFuture = sqlContext.sparkContext.submitMapStage(shuffleDependency) - _mapOutputStatistics = submittedStageFuture.get() + cachedShuffleRDD = coordinator match { + case Some(exchangeCoordinator) => + val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) + assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) + shuffleRDD + case _ => + val shuffleDependency = prepareShuffleDependency() + preparePostShuffleRDD(shuffleDependency) } - cachedShuffleRDD = preparePostShuffleRDD(shuffleDependency) } cachedShuffleRDD } } object ShuffleExchangeExec { + def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchangeExec = { + ShuffleExchangeExec(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator]) + } + /** * Determines whether records must be defensively copied before being sent to the shuffle. * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index a656a2f53e0a..45954f21c592 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -227,26 +227,26 @@ class SQLAppStatusListener( } } - private def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { - nodes.map { - case cluster: SparkPlanGraphCluster => - val storedCluster = new SparkPlanGraphClusterWrapper( - cluster.id, - cluster.name, - cluster.desc, - toStoredNodes(cluster.nodes), - cluster.metrics) - new SparkPlanGraphNodeWrapper(null, storedCluster) - - case node => - new SparkPlanGraphNodeWrapper(node, null) - } - } - private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = { val SparkListenerSQLExecutionStart(executionId, description, details, physicalPlanDescription, sparkPlanInfo, time) = event + def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { + nodes.map { + case cluster: SparkPlanGraphCluster => + val storedCluster = new SparkPlanGraphClusterWrapper( + cluster.id, + cluster.name, + cluster.desc, + toStoredNodes(cluster.nodes), + cluster.metrics) + new SparkPlanGraphNodeWrapper(null, storedCluster) + + case node => + new SparkPlanGraphNodeWrapper(node, null) + } + } + val planGraph = SparkPlanGraph(sparkPlanInfo) val sqlPlanMetrics = planGraph.allNodes.flatMap { node => node.metrics.map { metric => (metric.accumulatorId, metric) } @@ -267,27 +267,6 @@ class SQLAppStatusListener( update(exec) } - private def onAdaptiveExecutionUpdate(event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = { - val SparkListenerSQLAdaptiveExecutionUpdate(executionId, - physicalPlanDescription, sparkPlanInfo) = event - - val planGraph = SparkPlanGraph(sparkPlanInfo) - val sqlPlanMetrics = planGraph.allNodes.flatMap { node => - node.metrics.map { metric => (metric.accumulatorId, metric) } - }.toMap.values.toList - - val graphToStore = new SparkPlanGraphWrapper( - executionId, - toStoredNodes(planGraph.nodes), - planGraph.edges) - kvstore.write(graphToStore) - - val exec = getOrCreateExecution(executionId) - exec.physicalPlanDescription = physicalPlanDescription - exec.metrics = sqlPlanMetrics - update(exec) - } - private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { val SparkListenerSQLExecutionEnd(executionId, time) = event Option(liveExecutions.get(executionId)).foreach { exec => @@ -316,7 +295,6 @@ class SQLAppStatusListener( override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case e: SparkListenerSQLExecutionStart => onExecutionStart(e) - case e: SparkListenerSQLAdaptiveExecutionUpdate => onAdaptiveExecutionUpdate(e) case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e) case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e) case _ => // Ignore diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index eb1e44570ea8..03d75c4c1b82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -37,13 +37,6 @@ case class SparkListenerSQLExecutionStart( time: Long) extends SparkListenerEvent -@DeveloperApi -case class SparkListenerSQLAdaptiveExecutionUpdate( - executionId: Long, - physicalPlanDescription: String, - sparkPlanInfo: SparkPlanInfo) - extends SparkListenerEvent - @DeveloperApi case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) extends SparkListenerEvent { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 15b4acfb662b..e57d080dadf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -96,18 +96,6 @@ object SparkPlanGraph { case "InputAdapter" => buildSparkPlanGraphNode( planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null, exchanges) - case "QueryStage" | "BroadcastQueryStage" | "ResultQueryStage" | "ShuffleQueryStage" => - if (exchanges.contains(planInfo.children.head)) { - // Point to the re-used exchange - val node = exchanges(planInfo.children.head) - edges += SparkPlanGraphEdge(node.id, parent.id) - } else { - buildSparkPlanGraphNode( - planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null, exchanges) - } - case "QueryStageInput" | "ShuffleQueryStageInput" | "BroadcastQueryStageInput" => - buildSparkPlanGraphNode( - planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null, exchanges) case "Subquery" if subgraph != null => // Subquery should not be included in WholeStageCodegen buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, parent, null, exchanges) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 3f7f6155d519..8c34e47314db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1282,7 +1282,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val agg = cp.groupBy('id % 2).agg(count('id)) agg.queryExecution.executedPlan.collectFirst { - case ShuffleExchangeExec(_, _: RDDScanExec) => + case ShuffleExchangeExec(_, _: RDDScanExec, _) => case BroadcastExchangeExec(_, _: RDDScanExec) => }.foreach { _ => fail( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala index 0a26a5dcf273..3aa441cb0cf8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala @@ -22,8 +22,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite} import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql._ -import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageInput -import org.apache.spark.sql.execution.exchange.ExchangeCoordinator +import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -206,7 +205,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } test("test estimatePartitionStartIndices and enforce minimal number of reducers") { - val coordinator = new ExchangeCoordinator(100L, 2) + val coordinator = new ExchangeCoordinator(100L, Some(2)) { // The minimal number of post-shuffle partitions is not enforced because @@ -264,9 +263,9 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { new SparkConf(false) .setMaster("local[*]") .setAppName("test") - .set(UI_ENABLED, false) + .set("spark.ui.enabled", "false") + .set("spark.driver.allowMultipleContexts", "true") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5") - .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5") .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") .set( @@ -276,7 +275,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { case Some(numPartitions) => sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, numPartitions.toString) case None => - sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1") + sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "-1") } val spark = SparkSession.builder() @@ -306,21 +305,25 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val queryStageInputs = agg.queryExecution.executedPlan.collect { - case q: ShuffleQueryStageInput => q + val exchanges = agg.queryExecution.executedPlan.collect { + case e: ShuffleExchangeExec => e } - assert(queryStageInputs.length === 1) + assert(exchanges.length === 1) minNumPostShufflePartitions match { case Some(numPartitions) => - queryStageInputs.foreach { q => - assert(q.partitionStartIndices.isDefined) - assert(q.outputPartitioning.numPartitions === 5) + exchanges.foreach { + case e: ShuffleExchangeExec => + assert(e.coordinator.isDefined) + assert(e.outputPartitioning.numPartitions === 5) + case o => } case None => - queryStageInputs.foreach { q => - assert(q.partitionStartIndices.isDefined) - assert(q.outputPartitioning.numPartitions === 3) + exchanges.foreach { + case e: ShuffleExchangeExec => + assert(e.coordinator.isDefined) + assert(e.outputPartitioning.numPartitions === 3) + case o => } } } @@ -353,21 +356,25 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val queryStageInputs = join.queryExecution.executedPlan.collect { - case q: ShuffleQueryStageInput => q + val exchanges = join.queryExecution.executedPlan.collect { + case e: ShuffleExchangeExec => e } - assert(queryStageInputs.length === 2) + assert(exchanges.length === 2) minNumPostShufflePartitions match { case Some(numPartitions) => - queryStageInputs.foreach { q => - assert(q.partitionStartIndices.isDefined) - assert(q.outputPartitioning.numPartitions === 5) + exchanges.foreach { + case e: ShuffleExchangeExec => + assert(e.coordinator.isDefined) + assert(e.outputPartitioning.numPartitions === 5) + case o => } case None => - queryStageInputs.foreach { q => - assert(q.partitionStartIndices.isDefined) - assert(q.outputPartitioning.numPartitions === 2) + exchanges.foreach { + case e: ShuffleExchangeExec => + assert(e.coordinator.isDefined) + assert(e.outputPartitioning.numPartitions === 2) + case o => } } } @@ -405,26 +412,26 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val queryStageInputs = join.queryExecution.executedPlan.collect { - case q: ShuffleQueryStageInput => q + val exchanges = join.queryExecution.executedPlan.collect { + case e: ShuffleExchangeExec => e } - assert(queryStageInputs.length === 2) + assert(exchanges.length === 4) minNumPostShufflePartitions match { case Some(numPartitions) => - queryStageInputs.foreach { q => - assert(q.partitionStartIndices.isDefined) - assert(q.outputPartitioning.numPartitions === 5) + exchanges.foreach { + case e: ShuffleExchangeExec => + assert(e.coordinator.isDefined) + assert(e.outputPartitioning.numPartitions === 5) + case o => } case None => - queryStageInputs.foreach { q => - assert(q.partitionStartIndices.isDefined) - assert(q.outputPartitioning.numPartitions === 2) - } + assert(exchanges.forall(_.coordinator.isDefined)) + assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(2, 3)) } } - withSparkSession(test, 16384, minNumPostShufflePartitions) + withSparkSession(test, 6644, minNumPostShufflePartitions) } test(s"determining the number of reducers: complex query 2$testNameNote") { @@ -457,26 +464,39 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { // Then, let's look at the number of post-shuffle partitions estimated // by the ExchangeCoordinator. - val queryStageInputs = join.queryExecution.executedPlan.collect { - case q: ShuffleQueryStageInput => q + val exchanges = join.queryExecution.executedPlan.collect { + case e: ShuffleExchangeExec => e } - assert(queryStageInputs.length === 2) + assert(exchanges.length === 3) minNumPostShufflePartitions match { case Some(numPartitions) => - queryStageInputs.foreach { q => - assert(q.partitionStartIndices.isDefined) - assert(q.outputPartitioning.numPartitions === 5) + exchanges.foreach { + case e: ShuffleExchangeExec => + assert(e.coordinator.isDefined) + assert(e.outputPartitioning.numPartitions === 5) + case o => } case None => - queryStageInputs.foreach { q => - assert(q.partitionStartIndices.isDefined) - assert(q.outputPartitioning.numPartitions === 3) - } + assert(exchanges.forall(_.coordinator.isDefined)) + assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(5, 3)) } } - withSparkSession(test, 12000, minNumPostShufflePartitions) + withSparkSession(test, 6144, minNumPostShufflePartitions) + } + } + + test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") { + val test = { spark: SparkSession => + spark.sql("SET spark.sql.exchange.reuse=true") + val df = spark.range(1).selectExpr("id AS key", "id AS value") + val resultDf = df.join(df, "key").join(df, "key") + val sparkPlan = resultDf.queryExecution.executedPlan + assert(sparkPlan.collect { case p: ReusedExchangeExec => p }.length == 1) + assert(sparkPlan.collect { case p @ ShuffleExchangeExec(_, _, Some(c)) => p }.length == 3) + checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil) } + withSparkSession(test, 4, None) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index c97041a8f341..142ab6170a73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -411,7 +411,8 @@ class PlannerSuite extends SharedSQLContext { val inputPlan = ShuffleExchangeExec( partitioning, - DummySparkPlan(outputPartitioning = partitioning)) + DummySparkPlan(outputPartitioning = partitioning), + None) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 2) { @@ -426,7 +427,8 @@ class PlannerSuite extends SharedSQLContext { val inputPlan = ShuffleExchangeExec( partitioning, - DummySparkPlan(outputPartitioning = partitioning)) + DummySparkPlan(outputPartitioning = partitioning), + None) val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 1) { @@ -460,7 +462,8 @@ class PlannerSuite extends SharedSQLContext { DummySparkPlan( children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil, requiredChildDistribution = Seq(distribution), - requiredChildOrdering = Seq(Seq.empty))) + requiredChildOrdering = Seq(Seq.empty)), + None) val inputPlan = SortMergeJoinExec( Literal(1) :: Nil, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStageTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStageTest.scala deleted file mode 100644 index b02e7691e6ee..000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStageTest.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.plans.Inner -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.RangeExec -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.sql.execution.joins.{BuildRight, ShuffledHashJoinExec} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSQLContext - -class PlanQueryStageTest extends SharedSQLContext { - - test("Replaces ShuffleExchangeExec/BroadcastExchangeExec with reuse disabled") { - val range = org.apache.spark.sql.catalyst.plans.logical.Range(1, 100, 1, 1) - val originalPlan = ShuffleExchangeExec( - HashPartitioning(Seq(UnresolvedAttribute("blah")), 100), - RangeExec(range)) - - val conf = new SQLConf - conf.setConfString("spark.sql.exchange.reuse", "false") - val planQueryStage = PlanQueryStage(conf) - val newPlan = planQueryStage(originalPlan) - - val expectedPlan = ResultQueryStage( - ShuffleQueryStageInput( - ShuffleQueryStage(originalPlan), - range.output)) - - assert(newPlan == expectedPlan) - } - - test("Reuses ShuffleQueryStage when possible") { - val conf = new SQLConf - conf.setConfString("spark.sql.exchange.reuse", "true") - - val planQueryStage = PlanQueryStage(conf) - val newPlan = planQueryStage(createJoinExec(100, 100)) - - val collected = newPlan.collect { - case e: ShuffleQueryStageInput => e.childStage - } - - assert(collected.length == 2) - assert(collected(0).eq(collected(1))) - } - - test("Creates multiple ShuffleQueryStages when stages are different") { - val conf = new SQLConf - conf.setConfString("spark.sql.exchange.reuse", "true") - - val planQueryStage = PlanQueryStage(conf) - val newPlan = planQueryStage(createJoinExec(100, 101)) - - val collected = newPlan.collect { - case e: ShuffleQueryStageInput => e.childStage - } - - assert(collected.length == 2) - assert(!collected(0).eq(collected(1))) - } - - def createJoinExec(leftNum: Int, rightNum: Int): ShuffledHashJoinExec = { - val left = ShuffleExchangeExec( - HashPartitioning(Seq(UnresolvedAttribute("blah")), 100), - RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(1, leftNum, 1, 1))) - - val right = ShuffleExchangeExec( - HashPartitioning(Seq(UnresolvedAttribute("blah")), 100), - RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(1, rightNum, 1, 1))) - - ShuffledHashJoinExec( - Seq(UnresolvedAttribute("blah")), - Seq(UnresolvedAttribute("blah")), - Inner, - BuildRight, - None, - left, - right) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/QueryStageTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/QueryStageTest.scala deleted file mode 100644 index ce56ebc4351e..000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/QueryStageTest.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.execution.RangeExec -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSQLContext - -class QueryStageTest extends SharedSQLContext { - test("Adaptive Query Execution repartitions") { - val originalNumPartitions = 100 - - val plan = { - val leftRangeExec = RangeExec( - org.apache.spark.sql.catalyst.plans.logical.Range(1, 1000, 1, 1)) - - ShuffleExchangeExec( - HashPartitioning(leftRangeExec.output, originalNumPartitions), - leftRangeExec) - } - - assert(plan.execute().getNumPartitions == originalNumPartitions) - assert(PlanQueryStage.apply(new SQLConf)(plan).execute().getNumPartitions == 1) - } -}