-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL #20303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
7f0c2c9
4a9d054
3487eb8
7df45f8
a83967c
63fece9
52c7616
1081a3f
4a2311c
2c55985
5819826
ea93dbf
068ef94
4e69702
41f3a90
666bf76
e4bfc22
bef8ab8
2d6f110
fd413d4
028b0ac
2e08778
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -278,18 +278,23 @@ object SQLConf { | |
| val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") | ||
| .doc("When true, enable adaptive query execution.") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
| .createWithDefault(true) | ||
|
|
||
| val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = | ||
| buildConf("spark.sql.adaptive.minNumPostShufflePartitions") | ||
| .internal() | ||
| .doc("The advisory minimal number of post-shuffle partitions provided to " + | ||
| "ExchangeCoordinator. This setting is used in our test to make sure we " + | ||
| "have enough parallelism to expose issues that will not be exposed with a " + | ||
| "single partition. When the value is a non-positive value, this setting will " + | ||
| "not be provided to ExchangeCoordinator.") | ||
| .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.") | ||
| .intConf | ||
| .createWithDefault(-1) | ||
| .checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " + | ||
| "must be a positive integer.") | ||
| .createWithDefault(1) | ||
|
|
||
| val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS = | ||
| buildConf("spark.sql.adaptive.maxNumPostShufflePartitions") | ||
| .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.") | ||
| .intConf | ||
| .checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " + | ||
|
||
| "must be a positive integer.") | ||
| .createWithDefault(500) | ||
|
||
|
|
||
| val SUBEXPRESSION_ELIMINATION_ENABLED = | ||
| buildConf("spark.sql.subexpressionElimination.enabled") | ||
|
|
@@ -1728,8 +1733,9 @@ class SQLConf extends Serializable with Logging { | |
|
|
||
| def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) | ||
|
|
||
| def minNumPostShufflePartitions: Int = | ||
| getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) | ||
| def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) | ||
|
|
||
| def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS) | ||
|
|
||
| def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.adaptive | ||
|
|
||
| import java.util.concurrent.CountDownLatch | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.Attribute | ||
| import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SparkPlanInfo, SQLExecution} | ||
| import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate | ||
|
|
||
| /** | ||
| * A root node to execute the query plan adaptively. It splits the query plan into independent | ||
| * stages and executes them in order according to their dependencies. The query stage | ||
| * materializes its output at the end. When one stage completes, the data statistics of its | ||
| * materialized output will be used to optimize the subsequent stages. | ||
| * This is called mid-query re-optimization in database literature. | ||
| */ | ||
| case class AdaptiveSparkPlanExec(initialPlan: SparkPlan, session: SparkSession) | ||
| extends LeafExecNode{ | ||
|
|
||
| override def output: Seq[Attribute] = initialPlan.output | ||
|
|
||
| @volatile private var currentPlan: SparkPlan = initialPlan | ||
| @volatile private var error: Throwable = null | ||
|
|
||
| // We will release the lock when all the query stages are completed, or we fail to | ||
| // optimize/execute query stages. Getting `finalPlan` will be blocked until the lock is release. | ||
| // This is better than wait()/notify(), as we can easily check if the computation has completed, | ||
| // by calling `readyLock.getCount()`. | ||
| private val readyLock = new CountDownLatch(1) | ||
|
|
||
| private def createCallback(executionId: Option[Long]) = new QueryStageManagerCallback { | ||
| override def onPlanUpdate(updatedPlan: SparkPlan): Unit = { | ||
| updateCurrentPlan(updatedPlan, executionId) | ||
| } | ||
|
|
||
| override def onFinalPlan(finalPlan: SparkPlan): Unit = { | ||
| updateCurrentPlan(finalPlan, executionId) | ||
| readyLock.countDown() | ||
| } | ||
|
|
||
| override def onStageMaterializationFailed(stage: QueryStageExec, e: Throwable): Unit = { | ||
| error = new SparkException( | ||
| s""" | ||
| |Fail to materialize query stage ${stage.id}: | ||
| |${stage.plan.treeString} | ||
| """.stripMargin, e) | ||
| readyLock.countDown() | ||
| } | ||
|
|
||
| override def onError(e: Throwable): Unit = { | ||
| error = e | ||
| readyLock.countDown() | ||
| } | ||
| } | ||
|
|
||
| private def updateCurrentPlan(newPlan: SparkPlan, executionId: Option[Long]): Unit = { | ||
| currentPlan = newPlan | ||
| executionId.foreach { id => | ||
| session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate( | ||
| id, | ||
| SQLExecution.getQueryExecution(id).toString, | ||
| SparkPlanInfo.fromSparkPlan(currentPlan))) | ||
| } | ||
| } | ||
|
|
||
| def finalPlan: SparkPlan = { | ||
| if (readyLock.getCount > 0) { | ||
| val sc = session.sparkContext | ||
| val executionId = Option(sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong) | ||
| val stageManager = new QueryStageManager(initialPlan, session, createCallback(executionId)) | ||
| stageManager.start() | ||
| readyLock.await() | ||
| stageManager.stop() | ||
| } | ||
|
|
||
| if (error != null) throw error | ||
| currentPlan | ||
| } | ||
|
|
||
| override def executeCollect(): Array[InternalRow] = finalPlan.executeCollect() | ||
| override def executeTake(n: Int): Array[InternalRow] = finalPlan.executeTake(n) | ||
| override def executeToIterator(): Iterator[InternalRow] = finalPlan.executeToIterator() | ||
| override def doExecute(): RDD[InternalRow] = finalPlan.execute() | ||
| override def generateTreeString( | ||
| depth: Int, | ||
| lastChildren: Seq[Boolean], | ||
| append: String => Unit, | ||
| verbose: Boolean, | ||
| prefix: String = "", | ||
| addSuffix: Boolean = false, | ||
| maxFields: Int): Unit = { | ||
| currentPlan.generateTreeString( | ||
| depth, lastChildren, append, verbose, "", false, maxFields) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.adaptive | ||
|
|
||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
| import org.apache.spark.sql.execution.command.ExecutedCommandExec | ||
|
|
||
| /** | ||
| * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which executes the query plan | ||
| * adaptively with runtime data statistics. Note that this rule must be run after | ||
| * [[org.apache.spark.sql.execution.exchange.EnsureRequirements]], so that the exchange nodes are | ||
| * already inserted. | ||
| */ | ||
| case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan] { | ||
|
|
||
| override def apply(plan: SparkPlan): SparkPlan = plan match { | ||
| case _: ExecutedCommandExec => plan | ||
| case _ if session.sessionState.conf.adaptiveExecutionEnabled => | ||
| AdaptiveSparkPlanExec(plan, session.cloneSession()) | ||
| case _ => plan | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: we can simply write
_ > 0