Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ package org.apache.spark
* (may be inexact due to use of compressed map statuses)
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
extends Serializable
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ class QueryExecution(
* row format conversions as needed.
*/
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
val rules = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
adaptivePreparations
} else {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
preparations
}
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
}

/** A sequence of rules that will be applied in order to the physical plan before execution. */
Expand All @@ -109,9 +110,11 @@ class QueryExecution(
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))

// With adaptive execution, whole stage codegen will be done inside `QueryStageExecutor`.
protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf),
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryStageInput -> QueryStage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.QueryStageInput
import org.apache.spark.sql.execution.adaptive.QueryStage
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -53,7 +53,7 @@ private[execution] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case i: QueryStageInput => i.childStage :: Nil
case stage: QueryStage => stage.finalPlan :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@

package org.apache.spark.sql.execution.adaptive

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

/**
* Divide the spark plan into multiple QueryStages. For each Exchange in the plan, it adds a
Expand All @@ -37,44 +33,25 @@ import org.apache.spark.sql.types.StructType
case class PlanQueryStage(conf: SQLConf) extends Rule[SparkPlan] {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf: SQLConf is no longer needed.


def apply(plan: SparkPlan): SparkPlan = {

val newPlan = if (!conf.exchangeReuseEnabled) {
plan.transformUp {
case e: ShuffleExchangeExec =>
ShuffleQueryStageInput(ShuffleQueryStage(e), e.output)
case e: BroadcastExchangeExec =>
BroadcastQueryStageInput(BroadcastQueryStage(e), e.output)
}
} else {
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls.
val stages = mutable.HashMap[StructType, ArrayBuffer[QueryStage]]()

plan.transformUp {
case exchange: Exchange =>
val sameSchema = stages.getOrElseUpdate(exchange.schema, ArrayBuffer[QueryStage]())
val samePlan = sameSchema.find { s =>
exchange.sameResult(s.child)
}
if (samePlan.isDefined) {
// Keep the output of this exchange, the following plans require that to resolve
// attributes.
exchange match {
case e: ShuffleExchangeExec => ShuffleQueryStageInput(
samePlan.get.asInstanceOf[ShuffleQueryStage], exchange.output)
case e: BroadcastExchangeExec => BroadcastQueryStageInput(
samePlan.get.asInstanceOf[BroadcastQueryStage], exchange.output)
}
} else {
val queryStageInput = exchange match {
case e: ShuffleExchangeExec =>
ShuffleQueryStageInput(ShuffleQueryStage(e), e.output)
case e: BroadcastExchangeExec =>
BroadcastQueryStageInput(BroadcastQueryStage(e), e.output)
}
sameSchema += queryStageInput.childStage
queryStageInput
}
}
val exchangeToQueryStage = new java.util.IdentityHashMap[Exchange, QueryStage]
val newPlan = plan.transformUp {
case e: ShuffleExchangeExec =>
val queryStage = ShuffleQueryStage(e)
exchangeToQueryStage.put(e, queryStage)
ShuffleQueryStageReaderExec(queryStage, queryStage.output)
case e: BroadcastExchangeExec =>
val queryStage = BroadcastQueryStage(e)
exchangeToQueryStage.put(e, queryStage)
BroadcastQueryStageReaderExec(queryStage, queryStage.output)
// The `ReusedExchangeExec` was added in the rule `ReuseExchange`, via transforming up the
// query plan. This rule also transform up the query plan, so when we hit `ReusedExchangeExec`
// here, the exchange being reused must already be hit before and there should be an entry
// for it in `exchangeToQueryStage`.
case e: ReusedExchangeExec =>
exchangeToQueryStage.get(e.child) match {
case q: ShuffleQueryStage => ShuffleQueryStageReaderExec(q, e.output)
case q: BroadcastQueryStage => BroadcastQueryStageReaderExec(q, e.output)
}
}
ResultQueryStage(newPlan)
}
Expand Down
Loading