diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index af67632706df..a6b1318cd495 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -300,12 +300,6 @@ object SQLConf { .bytesConf(ByteUnit.BYTE) .createWithDefault(64 * 1024 * 1024) - val RUNTIME_REOPTIMIZATION_ENABLED = - buildConf("spark.sql.runtime.reoptimization.enabled") - .doc("When true, enable runtime query re-optimization.") - .booleanConf - .createWithDefault(false) - val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled") .doc("When true, enable adaptive query execution.") .booleanConf @@ -1948,10 +1942,7 @@ class SQLConf extends Serializable with Logging { def targetPostShuffleInputSize: Long = getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) - def runtimeReoptimizationEnabled: Boolean = getConf(RUNTIME_REOPTIMIZATION_ENABLED) - - def adaptiveExecutionEnabled: Boolean = - getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED) + def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9fcffac53c99..c8531e9a046a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -76,7 +76,7 @@ class QueryExecution( lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) { SparkSession.setActiveSession(sparkSession) // Runtime re-optimization requires a unique instance of every node in the logical plan. - val logicalPlan = if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) { + val logicalPlan = if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) { optimizedPlan.clone() } else { optimizedPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index a1b0e291c1b6..14ca2b41a442 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -44,7 +44,7 @@ case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan override def apply(plan: SparkPlan): SparkPlan = plan match { case _: ExecutedCommandExec => plan - case _ if conf.runtimeReoptimizationEnabled && supportAdaptive(plan) => + case _ if conf.adaptiveExecutionEnabled && supportAdaptive(plan) => try { // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse. Fall // back to non-adaptive mode if adaptive execution is supported in any of the sub-queries. @@ -57,13 +57,13 @@ case class InsertAdaptiveSparkPlan(session: SparkSession) extends Rule[SparkPlan AdaptiveSparkPlanExec(newPlan, session, subqueryMap, stageCache) } catch { case SubqueryAdaptiveNotSupportedException(subquery) => - logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} is enabled " + + logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + s"but is not supported for sub-query: $subquery.") plan } case _ => - if (conf.runtimeReoptimizationEnabled) { - logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} is enabled " + + if (conf.adaptiveExecutionEnabled) { + logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " + s"but is not supported for query: $plan.") } plan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index b7196d4578fb..8184baf50b04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.internal.SQLConf */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { private def defaultNumPreShufflePartitions: Int = - if (conf.runtimeReoptimizationEnabled) { + if (conf.adaptiveExecutionEnabled) { conf.maxNumPostShufflePartitions } else { conf.numShufflePartitions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index eb957192bcdc..7c1f6ca42c1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -320,7 +320,7 @@ abstract class StreamExecution( logicalPlan // Adaptive execution can change num shuffle partitions, disallow - sparkSessionForStream.conf.set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "false") + sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") // Disable cost-based join optimization as we do not want stateful operations to be rearranged sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false") offsetSeqMetadata = OffsetSeqMetadata( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 0b472c509e76..1705d5624409 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -255,8 +255,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo val operationCheckEnabled = sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled - if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) { - logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} " + + if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) { + logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + "is not supported in streaming DataFrames/Datasets and will be disabled.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala index fea77c78980c..20fed07d3872 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala @@ -275,7 +275,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA .setAppName("test") .set(UI_ENABLED, false) .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5") - .set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "true") + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") .set( SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 0a0973ad2111..d8efca323d51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -35,7 +35,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { val planBefore = dfAdaptive.queryExecution.executedPlan assert(planBefore.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=false)")) val result = dfAdaptive.collect() - withSQLConf(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "false") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { val df = sql(query) QueryTest.sameRows(result.toSeq, df.collect().toSeq) } @@ -82,7 +82,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("Change merge join to broadcast join") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData join testData2 ON key = a where value = '1'") @@ -95,7 +95,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("Change merge join to broadcast join and reduce number of shuffle partitions") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "150") { @@ -119,7 +119,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("Scalar subquery") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData join testData2 ON key = a " + @@ -133,7 +133,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("Scalar subquery in later stages") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData join testData2 ON key = a " + @@ -147,7 +147,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("multiple joins") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( """ @@ -168,7 +168,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("multiple joins with aggregate") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( """ @@ -191,7 +191,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("multiple joins with aggregate 2") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( """ @@ -214,7 +214,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("Exchange reuse") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT value FROM testData join testData2 ON key = a " + @@ -230,7 +230,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("Exchange reuse with subqueries") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT a FROM testData join testData2 ON key = a " + @@ -246,7 +246,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("Exchange reuse across subqueries") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( @@ -266,7 +266,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("Subquery reuse") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT a FROM testData join testData2 ON key = a " + @@ -285,7 +285,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { test("Broadcast exchange reuse across subqueries") { withSQLConf( - SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000", SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( @@ -307,7 +307,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { } test("Union/Except/Intersect queries") { - withSQLConf(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { runAdaptiveAndVerifyResult( """ |SELECT * FROM testData @@ -322,7 +322,7 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext { } test("Subquery de-correlation in Union queries") { - withSQLConf(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { withTempView("a", "b") { Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("a") Seq("a" -> 2, "b" -> 1).toDF("id", "num").createTempView("b")