Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,6 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(64 * 1024 * 1024)

val RUNTIME_REOPTIMIZATION_ENABLED =
buildConf("spark.sql.runtime.reoptimization.enabled")
.doc("When true, enable runtime query re-optimization.")
.booleanConf
.createWithDefault(false)

val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
.doc("When true, enable adaptive query execution.")
.booleanConf
Expand Down Expand Up @@ -1948,10 +1942,7 @@ class SQLConf extends Serializable with Logging {
def targetPostShuffleInputSize: Long =
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)

def runtimeReoptimizationEnabled: Boolean = getConf(RUNTIME_REOPTIMIZATION_ENABLED)

def adaptiveExecutionEnabled: Boolean =
getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED)
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class QueryExecution(
lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
SparkSession.setActiveSession(sparkSession)
// Runtime re-optimization requires a unique instance of every node in the logical plan.
val logicalPlan = if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) {
val logicalPlan = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
optimizedPlan.clone()
} else {
optimizedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan

override def apply(plan: SparkPlan): SparkPlan = plan match {
case _: ExecutedCommandExec => plan
case _ if conf.runtimeReoptimizationEnabled && supportAdaptive(plan) =>
case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan) =>
try {
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. Fall
// back to non-adaptive mode if adaptive execution is supported in any of the sub-queries.
Expand All @@ -57,13 +57,13 @@ case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan
AdaptiveSparkPlanExec(newPlan, session, subqueryMap, stageCache)
} catch {
case SubqueryAdaptiveNotSupportedException(subquery) =>
logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} is enabled " +
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for sub-query: $subquery.")
plan
}
case _ =>
if (conf.runtimeReoptimizationEnabled) {
logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} is enabled " +
if (conf.adaptiveExecutionEnabled) {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for query: $plan.")
}
plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
*/
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
private def defaultNumPreShufflePartitions: Int =
if (conf.runtimeReoptimizationEnabled) {
if (conf.adaptiveExecutionEnabled) {
conf.maxNumPostShufflePartitions
} else {
conf.numShufflePartitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ abstract class StreamExecution(
logicalPlan

// Adaptive execution can change num shuffle partitions, disallow
sparkSessionForStream.conf.set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "false")
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
offsetSeqMetadata = OffsetSeqMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo

val operationCheckEnabled = sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled

if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) {
logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} " +
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
"is not supported in streaming DataFrames/Datasets and will be disabled.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
.setAppName("test")
.set(UI_ENABLED, false)
.set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5")
.set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "true")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
.set(
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
val planBefore = dfAdaptive.queryExecution.executedPlan
assert(planBefore.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=false)"))
val result = dfAdaptive.collect()
withSQLConf(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "false") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val df = sql(query)
QueryTest.sameRows(result.toSeq, df.collect().toSeq)
}
Expand Down Expand Up @@ -82,7 +82,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("Change merge join to broadcast join") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
Expand All @@ -95,7 +95,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("Change merge join to broadcast join and reduce number of shuffle partitions") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "150") {
Expand All @@ -119,7 +119,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("Scalar subquery") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a " +
Expand All @@ -133,7 +133,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("Scalar subquery in later stages") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a " +
Expand All @@ -147,7 +147,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("multiple joins") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"""
Expand All @@ -168,7 +168,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("multiple joins with aggregate") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"""
Expand All @@ -191,7 +191,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("multiple joins with aggregate 2") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"""
Expand All @@ -214,7 +214,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("Exchange reuse") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT value FROM testData join testData2 ON key = a " +
Expand All @@ -230,7 +230,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("Exchange reuse with subqueries") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT a FROM testData join testData2 ON key = a " +
Expand All @@ -246,7 +246,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("Exchange reuse across subqueries") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
Expand All @@ -266,7 +266,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("Subquery reuse") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT a FROM testData join testData2 ON key = a " +
Expand All @@ -285,7 +285,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {

test("Broadcast exchange reuse across subqueries") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000",
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
Expand All @@ -307,7 +307,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
}

test("Union/Except/Intersect queries") {
withSQLConf(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
runAdaptiveAndVerifyResult(
"""
|SELECT * FROM testData
Expand All @@ -322,7 +322,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
}

test("Subquery de-correlation in Union queries") {
withSQLConf(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
withTempView("a", "b") {
Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("a")
Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("b")
Expand Down