Skip to content
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
52f0222
[SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL
maryannxue May 28, 2019
1c665d2
Address review comments
maryannxue May 28, 2019
a9b4209
Address review comments
maryannxue May 29, 2019
cbfbc4e
Remove reuse-stage lock
maryannxue May 29, 2019
4255421
Address review comments
maryannxue May 30, 2019
a656e42
Refactoring: remove applyPhysicalRules from InsertAdaptiveSparkPlan
maryannxue May 30, 2019
ac0794d
Address review comments
maryannxue May 30, 2019
62af5be
Address review comments
maryannxue May 31, 2019
77a668b
add lock object
maryannxue May 31, 2019
3e85e74
fix
maryannxue Jun 1, 2019
bd6a364
fix
maryannxue Jun 2, 2019
9eaf307
Address review comments
maryannxue Jun 3, 2019
e2fa8e3
fix
maryannxue Jun 3, 2019
55450e9
fix
maryannxue Jun 5, 2019
4b0755d
Fix the ambiguous logical node matching issue
maryannxue Jun 12, 2019
6e547d7
Revert "fix"
maryannxue Jun 12, 2019
baef964
Followup for 'Revert "fix"'
maryannxue Jun 12, 2019
ec59f88
Address review comments
maryannxue Jun 12, 2019
9af4eb1
Refine explain string for AdaptiveSparkPlanExec
maryannxue Jun 12, 2019
5b5ac2e
minor refinement
maryannxue Jun 12, 2019
da33bd7
Refine explain string of QueryStageExec
maryannxue Jun 12, 2019
5688cb4
code refinement for TreeNode; add a test in TreeNodeSuite
maryannxue Jun 13, 2019
a40b771
Address review comments
maryannxue Jun 13, 2019
37905f5
Address review comments
maryannxue Jun 13, 2019
eb8fe75
fix
maryannxue Jun 14, 2019
4481085
code refinement
maryannxue Jun 14, 2019
8570ec0
Address review comments
maryannxue Jun 14, 2019
237c067
Address review comments
maryannxue Jun 14, 2019
d6e040b
Merge remote-tracking branch 'origin/master' into aqe
maryannxue Jun 14, 2019
e265104
fix
maryannxue Jun 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,76 +317,92 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
*/
def mapChildren(f: BaseType => BaseType): BaseType = {
if (children.nonEmpty) {
var changed = false
def mapChild(child: Any): Any = child match {
case arg: TreeNode[_] if containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType])
if (!(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case tuple@(arg1: TreeNode[_], arg2: TreeNode[_]) =>
val newChild1 = if (containsChild(arg1)) {
f(arg1.asInstanceOf[BaseType])
} else {
arg1.asInstanceOf[BaseType]
}
mapProductElements(f, applyToAll = false)
} else {
this
}
}

val newChild2 = if (containsChild(arg2)) {
f(arg2.asInstanceOf[BaseType])
} else {
arg2.asInstanceOf[BaseType]
}
/**
* Returns a copy of this node where `f` has been applied to all applicable `TreeNode` elements
* in the productIterator.
* @param f the transform function to be applied on applicable `TreeNode` elements.
* @param applyToAll If true, the transform function will be applied to all `TreeNode` elements
* even for non-child elements; otherwise, the function will only be applied
* on children nodes. Also, when this is true, a copy of this node will be
* returned even if no elements have been changed.
*/
private def mapProductElements(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can then we should spin this off in a separate PR.

f: BaseType => BaseType,
applyToAll: Boolean): BaseType = {
var changed = false

if (!(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
changed = true
(newChild1, newChild2)
} else {
tuple
}
case other => other
}
def mapChild(child: Any): Any = child match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mapElement?

case arg: TreeNode[_] if applyToAll || containsChild(arg) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not super intuitive to use the same flag to allow transformations on all element, and to force a copy. Can we maybe split them for documentation purposes?

I am also not entirely convinced that we need to transform all elements. This change now also transforms expressions in a plan node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the expressions, I think it totally depends on the usage, and right now we don't need to copy expressions for AQE I believe.
On the other hand, though, there's "fake leaf nodes" that derive from LeafNode but do have children nodes not declared as children, e.g., ReusedExchange. Again, right now for AQE usage, we only care about the logical plans, so we are probably OK? (can't think of any logical plan fake leaf nodes so far).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked and there's no "fake LeafNode" in the logical plan space. So I removed the "applyToAll" from the condition for transforming the elements and renamed it to "forceCopy". I've also changed the method name back to "mapChildren" since it's only for children nodes.

val newChild = f(arg.asInstanceOf[BaseType])
if (applyToAll || !(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case tuple @ (arg1: TreeNode[_], arg2: TreeNode[_]) =>
val newChild1 = if (applyToAll || containsChild(arg1)) {
f(arg1.asInstanceOf[BaseType])
} else {
arg1.asInstanceOf[BaseType]
}

val newChild2 = if (applyToAll || containsChild(arg2)) {
f(arg2.asInstanceOf[BaseType])
} else {
arg2.asInstanceOf[BaseType]
}

if (applyToAll || !(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
changed = true
(newChild1, newChild2)
} else {
tuple
}
case other => other
}

val newArgs = mapProductIterator {
case arg: TreeNode[_] if containsChild(arg) =>
val newArgs = mapProductIterator {
case arg: TreeNode[_] if applyToAll || containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType])
if (applyToAll || !(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case Some(arg: TreeNode[_]) if applyToAll || containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType])
if (applyToAll || !(newChild fastEquals arg)) {
changed = true
Some(newChild)
} else {
Some(arg)
}
case m: Map[_, _] => m.mapValues {
case arg: TreeNode[_] if applyToAll || containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType])
if (!(newChild fastEquals arg)) {
if (applyToAll || !(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case Some(arg: TreeNode[_]) if containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType])
if (!(newChild fastEquals arg)) {
changed = true
Some(newChild)
} else {
Some(arg)
}
case m: Map[_, _] => m.mapValues {
case arg: TreeNode[_] if containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType])
if (!(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case other => other
}.view.force // `mapValues` is lazy and we need to force it to materialize
case d: DataType => d // Avoid unpacking Structs
case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
case args: Iterable[_] => args.map(mapChild)
case nonChild: AnyRef => nonChild
case null => null
}
if (changed) makeCopy(newArgs) else this
} else {
this
case other => other
}.view.force // `mapValues` is lazy and we need to force it to materialize
case d: DataType => d // Avoid unpacking Structs
case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
case args: Iterable[_] => args.map(mapChild)
case nonChild: AnyRef => nonChild
case null => null
}
if (applyToAll || changed) makeCopy(newArgs, applyToAll) else this
}

/**
Expand All @@ -402,9 +418,20 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
* that are not present in the productIterator.
* @param newArgs the new product arguments.
*/
def makeCopy(newArgs: Array[AnyRef]): BaseType = attachTree(this, "makeCopy") {
def makeCopy(newArgs: Array[AnyRef]): BaseType = makeCopy(newArgs, allowEmptyArgs = false)

/**
* Creates a copy of this type of tree node after a transformation.
* Must be overridden by child classes that have constructor arguments
* that are not present in the productIterator.
* @param newArgs the new product arguments.
* @param allowEmptyArgs whether to allow argument list to be empty.
*/
private def makeCopy(
newArgs: Array[AnyRef],
allowEmptyArgs: Boolean): BaseType = attachTree(this, "makeCopy") {
// Skip no-arg constructors that are just there for kryo.
val ctors = getClass.getConstructors.filter(_.getParameterTypes.size != 0)
val ctors = getClass.getConstructors.filter(allowEmptyArgs || _.getParameterTypes.size != 0)
if (ctors.isEmpty) {
sys.error(s"No valid constructor for $nodeName")
}
Expand Down Expand Up @@ -447,6 +474,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
}

override def clone(): BaseType = {
mapProductElements(_.clone(), applyToAll = true)
}

/**
* Returns the name of this type of TreeNode. Defaults to the class name.
* Note that we remove the "Exec" suffix for physical operators here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)

val RUNTIME_REOPTIMIZATION_ENABLED =
buildConf("spark.sql.runtime.reoptimization.enabled")
.doc("When true, enable runtime query re-optimization.")
.booleanConf
.createWithDefault(false)

val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution.")
.booleanConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to update the doc for this config. It isn't enabled when runtime query re-optimization is true.

Copy link
Contributor Author

@maryannxue maryannxue May 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave it now and see if we should use the existing config spark.sql.adaptive.enabled instead.

Expand Down Expand Up @@ -1882,7 +1888,10 @@ class SQLConf extends Serializable with Logging {
def targetPostShuffleInputSize: Long =
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
def runtimeReoptimizationEnabled: Boolean = getConf(RUNTIME_REOPTIMIZATION_ENABLED)

def adaptiveExecutionEnabled: Boolean =
getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED)

def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions.DslString
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin, SQLHelper}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Union}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{IdentityBroadcastMode, RoundRobinPartitioning, SinglePartition}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -82,6 +82,11 @@ case class SelfReferenceUDF(
def apply(key: String): Boolean = config.contains(key)
}

case class FakeLeafPlan(child: LogicalPlan)
extends org.apache.spark.sql.catalyst.plans.logical.LeafNode {
override def output: Seq[Attribute] = child.output
}

class TreeNodeSuite extends SparkFunSuite with SQLHelper {
test("top node changed") {
val after = Literal(1) transform { case Literal(1, _) => Literal(2) }
Expand Down Expand Up @@ -673,4 +678,34 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
})
}
}

test("clone") {
def assertDifferentInstance(before: AnyRef, after: AnyRef): Unit = {
assert(before.ne(after) && before == after)
before.asInstanceOf[TreeNode[_]].children.zip(
after.asInstanceOf[TreeNode[_]].children).foreach {
case (beforeChild: AnyRef, afterChild: AnyRef) =>
assertDifferentInstance(beforeChild, afterChild)
}
}

// Empty constructor
val rowNumber = RowNumber()
assertDifferentInstance(rowNumber, rowNumber.clone())

// Overridden `makeCopy`
val oneRowRelation = OneRowRelation()
assertDifferentInstance(oneRowRelation, oneRowRelation.clone())

// Multi-way operators
val intersect =
Intersect(oneRowRelation, Union(Seq(oneRowRelation, oneRowRelation)), isAll = false)
assertDifferentInstance(intersect, intersect.clone())

// Leaf node with an inner child
val leaf = FakeLeafPlan(intersect)
val leafCloned = leaf.clone()
assertDifferentInstance(leaf, leafCloned)
assertDifferentInstance(leaf.child, leafCloned.asInstanceOf[FakeLeafPlan].child)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.InsertAdaptiveSparkPlan
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -74,9 +75,15 @@ class QueryExecution(

lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
SparkSession.setActiveSession(sparkSession)
// Runtime re-optimization requires a unique instance of every node in the logical plan.
val logicalPlan = if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) {
optimizedPlan.clone()
} else {
optimizedPlan
}
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(optimizedPlan)).next()
planner.plan(ReturnAnswer(logicalPlan)).next()
}

// executedPlan should not be used to initialize any SparkPlan. It should be
Expand Down Expand Up @@ -107,6 +114,9 @@ class QueryExecution(

/** A sequence of rules that will be applied in order to the physical plan before execution. */
protected def preparations: Seq[Rule[SparkPlan]] = Seq(
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
InsertAdaptiveSparkPlan(sparkSession),
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.DataType

object SparkPlan {
// a TreeNode tag in SparkPlan, to carry its original logical plan. The planner will add this tag
// when converting a logical plan to a physical plan.
/** The original [[LogicalPlan]] from which this [[SparkPlan]] is converted. */
val LOGICAL_PLAN_TAG = TreeNodeTag[LogicalPlan]("logical_plan")

/** The [[LogicalPlan]] inherited from its ancestor. */
val LOGICAL_PLAN_INHERITED_TAG = TreeNodeTag[LogicalPlan]("logical_plan_inherited")
}

/**
Expand Down Expand Up @@ -79,6 +81,35 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
super.makeCopy(newArgs)
}

/**
* @return The logical plan this plan is linked to.
*/
def logicalLink: Option[LogicalPlan] =
getTagValue(SparkPlan.LOGICAL_PLAN_TAG)
.orElse(getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG))

/**
* Set logical plan link recursively if unset.
*/
def setLogicalLink(logicalPlan: LogicalPlan): Unit = {
setLogicalLink(logicalPlan, false)
}

private def setLogicalLink(logicalPlan: LogicalPlan, inherited: Boolean = false): Unit = {
// Stop at a descendant which is the root of a sub-tree transformed from another logical node.
if (inherited && getTagValue(SparkPlan.LOGICAL_PLAN_TAG).isDefined) {
return
}

val tag = if (inherited) {
SparkPlan.LOGICAL_PLAN_INHERITED_TAG
} else {
SparkPlan.LOGICAL_PLAN_TAG
}
setTagValue(tag, logicalPlan)
children.foreach(_.setLogicalLink(logicalPlan, true))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we set inherited logical plan into children, we can't set logical plan into them? So only the top SparkPlan has its logical plan, all its children just have inherited logical plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's true. And you could always "force set" this logical link if need be.
It's not necessary to draw a line between "logical plan" and "inherited logical plan" for the use of adaptive execution, so this is simply to make sure any future use of it can tell a top node from the rest.

}

/**
* @return All metrics containing metrics of this SparkPlan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -53,6 +54,8 @@ private[execution] object SparkPlanInfo {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case ReusedSubqueryExec(child) => child :: Nil
case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil
case stage: QueryStageExec => stage.plan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.adaptive.LogicalQueryStageStrategy
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -36,6 +37,7 @@ class SparkPlanner(
override def strategies: Seq[Strategy] =
experimentalMethods.extraStrategies ++
extraPlanningStrategies ++ (
LogicalQueryStageStrategy ::
PythonEvals ::
DataSourceV2Strategy ::
FileSourceStrategy ::
Expand Down
Loading