Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,20 +271,33 @@ object QueryExecution {
* are correct, insert whole stage code gen, and try to reduce the work done by reusing exchanges
* and subqueries.
*/
private[execution] def preparations(sparkSession: SparkSession): Seq[Rule[SparkPlan]] =
private[execution] def preparations(sparkSession: SparkSession): Seq[Rule[SparkPlan]] = {

val sparkSessionWithAdaptiveExecutionOff =
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
val session = sparkSession.cloneSession()
session.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false)
session
} else {
sparkSession
}

Seq(
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession)),
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
// If the following rules apply, it means the main query is not AQE-ed, so we make sure the
// subqueries are not AQE-ed either.
PlanDynamicPruningFilters(sparkSessionWithAdaptiveExecutionOff),
PlanSubqueries(sparkSessionWithAdaptiveExecutionOff),
EnsureRequirements(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf)
)
}

/**
* Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.scalatest.GivenWhenThen
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.sql.test.SharedSparkSession
/**
* Test suite for the filtering ratio policy used to trigger dynamic partition pruning (DPP).
*/
class DynamicPartitionPruningSuite
abstract class DynamicPartitionPruningSuiteBase
extends QueryTest
with SharedSparkSession
with GivenWhenThen
Expand All @@ -43,9 +43,14 @@ class DynamicPartitionPruningSuite

import testImplicits._

val adaptiveExecutionOn: Boolean

override def beforeAll(): Unit = {
super.beforeAll()

spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, adaptiveExecutionOn)
spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY, true)

val factData = Seq[(Int, Int, Int, Int)](
(1000, 1, 1, 10),
(1010, 2, 1, 10),
Expand Down Expand Up @@ -153,6 +158,8 @@ class DynamicPartitionPruningSuite
sql("DROP TABLE IF EXISTS fact_stats")
sql("DROP TABLE IF EXISTS dim_stats")
} finally {
spark.sessionState.conf.unsetConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)
spark.sessionState.conf.unsetConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY)
super.afterAll()
}
}
Expand Down Expand Up @@ -195,6 +202,11 @@ class DynamicPartitionPruningSuite
fail(s"Invalid child node found in\n$s")
}
}

val isMainQueryAdaptive = plan.isInstanceOf[AdaptiveSparkPlanExec]
subqueriesAll(plan).filterNot(subqueryBroadcast.contains).foreach { s =>
assert(s.find(_.isInstanceOf[AdaptiveSparkPlanExec]).isDefined == isMainQueryAdaptive)
}
}

/**
Expand Down Expand Up @@ -1173,8 +1185,7 @@ class DynamicPartitionPruningSuite
}

test("join key with multiple references on the filtering plan") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
// when enable AQE, the reusedExchange is inserted when executed.
withTable("fact", "dim") {
spark.range(100).select(
Expand Down Expand Up @@ -1270,3 +1281,11 @@ class DynamicPartitionPruningSuite
}
}
}

class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {
override val adaptiveExecutionOn: Boolean = false
}

class DynamicPartitionPruningSuiteAEOn extends DynamicPartitionPruningSuiteBase {
override val adaptiveExecutionOn: Boolean = true
}