Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
52f0222
[SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL
maryannxue May 28, 2019
1c665d2
Address review comments
maryannxue May 28, 2019
a9b4209
Address review comments
maryannxue May 29, 2019
cbfbc4e
Remove reuse-stage lock
maryannxue May 29, 2019
4255421
Address review comments
maryannxue May 30, 2019
a656e42
Refactoring: remove applyPhysicalRules from InsertAdaptiveSparkPlan
maryannxue May 30, 2019
ac0794d
Address review comments
maryannxue May 30, 2019
62af5be
Address review comments
maryannxue May 31, 2019
77a668b
add lock object
maryannxue May 31, 2019
3e85e74
fix
maryannxue Jun 1, 2019
bd6a364
fix
maryannxue Jun 2, 2019
9eaf307
Address review comments
maryannxue Jun 3, 2019
e2fa8e3
fix
maryannxue Jun 3, 2019
55450e9
fix
maryannxue Jun 5, 2019
4b0755d
Fix the ambiguous logical node matching issue
maryannxue Jun 12, 2019
6e547d7
Revert "fix"
maryannxue Jun 12, 2019
baef964
Followup for 'Revert "fix"'
maryannxue Jun 12, 2019
ec59f88
Address review comments
maryannxue Jun 12, 2019
9af4eb1
Refine explain string for AdaptiveSparkPlanExec
maryannxue Jun 12, 2019
5b5ac2e
minor refinement
maryannxue Jun 12, 2019
da33bd7
Refine explain string of QueryStageExec
maryannxue Jun 12, 2019
5688cb4
code refinement for TreeNode; add a test in TreeNodeSuite
maryannxue Jun 13, 2019
a40b771
Address review comments
maryannxue Jun 13, 2019
37905f5
Address review comments
maryannxue Jun 13, 2019
eb8fe75
fix
maryannxue Jun 14, 2019
4481085
code refinement
maryannxue Jun 14, 2019
8570ec0
Address review comments
maryannxue Jun 14, 2019
237c067
Address review comments
maryannxue Jun 14, 2019
d6e040b
Merge remote-tracking branch 'origin/master' into aqe
maryannxue Jun 14, 2019
e265104
fix
maryannxue Jun 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)

val RUNTIME_REOPTIMIZATION_ENABLED =
buildConf("spark.sql.runtime.reoptimization.enabled")
.doc("When true, enable runtime query re-optimization.")
.booleanConf
.createWithDefault(false)

val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution.")
.booleanConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to update the doc for this config. It isn't enabled when runtime query re-optimization is true.

Copy link
Contributor Author

@maryannxue maryannxue May 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave it now and see if we should use the existing config spark.sql.adaptive.enabled instead.

Expand Down Expand Up @@ -1889,7 +1895,10 @@ class SQLConf extends Serializable with Logging {
def targetPostShuffleInputSize: Long =
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
def runtimeReoptimizationEnabled: Boolean = getConf(RUNTIME_REOPTIMIZATION_ENABLED)

def adaptiveExecutionEnabled: Boolean =
getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -74,9 +75,15 @@ class QueryExecution(

lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
SparkSession.setActiveSession(sparkSession)
// Runtime re-optimization requires a unique instance of every node in the logical plan.
val logicalPlan = if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) {
optimizedPlan.clone()
} else {
optimizedPlan
}
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
planner.plan(ReturnAnswer(logicalPlan)).next()
}

// executedPlan should not be used to initialize any SparkPlan. It should be
Expand Down Expand Up @@ -107,6 +114,9 @@ class QueryExecution(

/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
InsertAdaptiveSparkPlan(sparkSession),
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.DataType

object SparkPlan {
// a TreeNode tag in SparkPlan, to carry its original logical plan. The planner will add this tag
// when converting a logical plan to a physical plan.
/** The original [[LogicalPlan]] from which this [[SparkPlan]] is converted. */
val LOGICAL_PLAN_TAG = TreeNodeTag[LogicalPlan]("logical_plan")

/** The [[LogicalPlan]] inherited from its ancestor. */
val LOGICAL_PLAN_INHERITED_TAG = TreeNodeTag[LogicalPlan]("logical_plan_inherited")
}

/**
Expand Down Expand Up @@ -79,6 +81,35 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
super.makeCopy(newArgs)
}

/**
* @return The logical plan this plan is linked to.
*/
def logicalLink: Option[LogicalPlan] =
getTagValue(SparkPlan.LOGICAL_PLAN_TAG)
.orElse(getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG))

/**
* Set logical plan link recursively if unset.
*/
def setLogicalLink(logicalPlan: LogicalPlan): Unit = {
setLogicalLink(logicalPlan, false)
}

private def setLogicalLink(logicalPlan: LogicalPlan, inherited: Boolean = false): Unit = {
// Stop at a descendant which is the root of a sub-tree transformed from another logical node.
if (inherited && getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isDefined) {
return
}

val tag = if (inherited) {
SparkPlan.LOGICAL_PLAN_INHERITED_TAG
} else {
SparkPlan.LOGICAL_PLAN_TAG
}
setTagValue(tag, logicalPlan)
children.foreach(_.setLogicalLink(logicalPlan, true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we set inherited logical plan into children, we can't set logical plan into them? So only the top SparkPlan has its logical plan, all its children just have inherited logical plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's true. And you could always "force set" this logical link if need be.
It's not necessary to draw a line between "logical plan" and "inherited logical plan" for the use of adaptive execution, so this is simply to make sure any future use of it can tell a top node from the rest.

}

/**
* @return All metrics containing metrics of this SparkPlan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -53,6 +54,8 @@ private[execution] object SparkPlanInfo {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case ReusedSubqueryExec(child) => child :: Nil
case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil
case stage: QueryStageExec => stage.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.adaptive.LogicalQueryStageStrategy
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -36,6 +37,7 @@ class SparkPlanner(
override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
LogicalQueryStageStrategy ::
PythonEvals ::
DataSourceV2Strategy ::
FileSourceStrategy ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.adaptive.LogicalQueryStage
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down Expand Up @@ -58,6 +59,8 @@ case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
protected override def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException()
}

override def setLogicalLink(logicalPlan: LogicalPlan): Unit = {}
}

abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Expand All @@ -69,7 +72,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ReturnAnswer(rootPlan) => rootPlan
case _ => plan
}
p.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, logicalPlan)
p.setLogicalLink(logicalPlan)
p
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import java.io.Writer
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
import scala.util.control.NonFatal
Expand Down Expand Up @@ -551,56 +552,6 @@ object WholeStageCodegenExec {
}
}

object WholeStageCodegenId {
// codegenStageId: ID for codegen stages within a query plan.
// It does not affect equality, nor does it participate in destructuring pattern matching
// of WholeStageCodegenExec.
//
// This ID is used to help differentiate between codegen stages. It is included as a part
// of the explain output for physical plans, e.g.
//
// == Physical Plan ==
// *(5) SortMergeJoin [x#3L], [y#9L], Inner
// :- *(2) Sort [x#3L ASC NULLS FIRST], false, 0
// : +- Exchange hashpartitioning(x#3L, 200)
// : +- *(1) Project [(id#0L % 2) AS x#3L]
// : +- *(1) Filter isnotnull((id#0L % 2))
// : +- *(1) Range (0, 5, step=1, splits=8)
// +- *(4) Sort [y#9L ASC NULLS FIRST], false, 0
// +- Exchange hashpartitioning(y#9L, 200)
// +- *(3) Project [(id#6L % 2) AS y#9L]
// +- *(3) Filter isnotnull((id#6L % 2))
// +- *(3) Range (0, 5, step=1, splits=8)
//
// where the ID makes it obvious that not all adjacent codegen'd plan operators are of the
// same codegen stage.
//
// The codegen stage ID is also optionally included in the name of the generated classes as
// a suffix, so that it's easier to associate a generated class back to the physical operator.
// This is controlled by SQLConf: spark.sql.codegen.useIdInClassName
//
// The ID is also included in various log messages.
//
// Within a query, a codegen stage in a plan starts counting from 1, in "insertion order".
// WholeStageCodegenExec operators are inserted into a plan in depth-first post-order.
// See CollapseCodegenStages.insertWholeStageCodegen for the definition of insertion order.
//
// 0 is reserved as a special ID value to indicate a temporary WholeStageCodegenExec object
// is created, e.g. for special fallback handling when an existing WholeStageCodegenExec
// failed to generate/compile code.

private val codegenStageCounter: ThreadLocal[Integer] = ThreadLocal.withInitial(() => 1)

def resetPerQuery(): Unit = codegenStageCounter.set(1)

def getNextStageId(): Int = {
val counter = codegenStageCounter
val id = counter.get()
counter.set(id + 1)
id
}
}

/**
* WholeStageCodegen compiles a subtree of plans that support codegen together into single Java
* function.
Expand Down Expand Up @@ -824,8 +775,48 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)

/**
* Find the chained plans that support codegen, collapse them together as WholeStageCodegen.
*
* The `codegenStageCounter` generates ID for codegen stages within a query plan.
* It does not affect equality, nor does it participate in destructuring pattern matching
* of WholeStageCodegenExec.
*
* This ID is used to help differentiate between codegen stages. It is included as a part
* of the explain output for physical plans, e.g.
*
* == Physical Plan ==
* *(5) SortMergeJoin [x#3L], [y#9L], Inner
* :- *(2) Sort [x#3L ASC NULLS FIRST], false, 0
* : +- Exchange hashpartitioning(x#3L, 200)
* : +- *(1) Project [(id#0L % 2) AS x#3L]
* : +- *(1) Filter isnotnull((id#0L % 2))
* : +- *(1) Range (0, 5, step=1, splits=8)
* +- *(4) Sort [y#9L ASC NULLS FIRST], false, 0
* +- Exchange hashpartitioning(y#9L, 200)
* +- *(3) Project [(id#6L % 2) AS y#9L]
* +- *(3) Filter isnotnull((id#6L % 2))
* +- *(3) Range (0, 5, step=1, splits=8)
*
* where the ID makes it obvious that not all adjacent codegen'd plan operators are of the
* same codegen stage.
*
* The codegen stage ID is also optionally included in the name of the generated classes as
* a suffix, so that it's easier to associate a generated class back to the physical operator.
* This is controlled by SQLConf: spark.sql.codegen.useIdInClassName
*
* The ID is also included in various log messages.
*
* Within a query, a codegen stage in a plan starts counting from 1, in "insertion order".
* WholeStageCodegenExec operators are inserted into a plan in depth-first post-order.
* See CollapseCodegenStages.insertWholeStageCodegen for the definition of insertion order.
*
* 0 is reserved as a special ID value to indicate a temporary WholeStageCodegenExec object
* is created, e.g. for special fallback handling when an existing WholeStageCodegenExec
* failed to generate/compile code.
*/
case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case class CollapseCodegenStages(
conf: SQLConf,
codegenStageCounter: AtomicInteger = new AtomicInteger(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we not use the default value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could have made that part of the class body. OTOH this also works :)...

extends Rule[SparkPlan] {

private def supportCodegen(e: Expression): Boolean = e match {
case e: LeafExpression => true
Expand Down Expand Up @@ -869,14 +860,13 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
case plan: CodegenSupport if supportCodegen(plan) =>
WholeStageCodegenExec(insertInputAdapter(plan))(WholeStageCodegenId.getNextStageId())
WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet())
case other =>
other.withNewChildren(other.children.map(insertWholeStageCodegen))
}

def apply(plan: SparkPlan): SparkPlan = {
if (conf.wholeStageEnabled) {
WholeStageCodegenId.resetPerQuery()
insertWholeStageCodegen(plan)
} else {
plan
Expand Down
Loading