Skip to content

Commit a763413

Browse files
delta003bulldozer-bot[bot]
authored andcommitted
Revert "[AE2.3-02][SPARK-23128] Add QueryStage and the framework for adaptive execution (auto setting the number of reducer)" (apache#514)
This reverts https://github.com/palantir/spark/pull/443/files#diff-3cd46a3f60c5352282bd3f2c9efff7fc fysa @justinuang This reverts commit 8a4a29b, reversing changes made to 8bfddaf. ## Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain) ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request.
1 parent 0420565 commit a763413

File tree

18 files changed

+412
-760
lines changed

18 files changed

+412
-760
lines changed

core/src/main/scala/org/apache/spark/MapOutputStatistics.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,3 @@ package org.apache.spark
2525
* (may be inexact due to use of compressed map statuses)
2626
*/
2727
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
28-
extends Serializable

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -282,19 +282,14 @@ object SQLConf {
282282

283283
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
284284
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
285-
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
286-
.intConf
287-
.checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " +
288-
"must be a positive integer.")
289-
.createWithDefault(1)
290-
291-
val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
292-
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
293-
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.")
285+
.internal()
286+
.doc("The advisory minimal number of post-shuffle partitions provided to " +
287+
"ExchangeCoordinator. This setting is used in our test to make sure we " +
288+
"have enough parallelism to expose issues that will not be exposed with a " +
289+
"single partition. When the value is a non-positive value, this setting will " +
290+
"not be provided to ExchangeCoordinator.")
294291
.intConf
295-
.checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " +
296-
"must be a positive integer.")
297-
.createWithDefault(500)
292+
.createWithDefault(-1)
298293

299294
val SUBEXPRESSION_ELIMINATION_ENABLED =
300295
buildConf("spark.sql.subexpressionElimination.enabled")
@@ -1783,9 +1778,8 @@ class SQLConf extends Serializable with Logging {
17831778

17841779
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
17851780

1786-
def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
1787-
1788-
def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS)
1781+
def minNumPostShufflePartitions: Int =
1782+
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
17891783

17901784
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
17911785

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
2828
import org.apache.spark.sql.catalyst.plans.QueryPlan
2929
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
3030
import org.apache.spark.sql.catalyst.rules.Rule
31+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3132
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
3233
import org.apache.spark.sql.catalyst.util.truncatedString
33-
import org.apache.spark.sql.execution.adaptive.PlanQueryStage
3434
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
3535
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
3636
import org.apache.spark.sql.internal.SQLConf
@@ -95,11 +95,7 @@ class QueryExecution(
9595
* row format conversions as needed.
9696
*/
9797
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
98-
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
99-
adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
100-
} else {
101-
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
102-
}
98+
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
10399
}
104100

105101
/** A sequence of rules that will be applied in order to the physical plan before execution. */
@@ -110,15 +106,6 @@ class QueryExecution(
110106
ReuseExchange(sparkSession.sessionState.conf),
111107
ReuseSubquery(sparkSession.sessionState.conf))
112108

113-
protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
114-
PlanSubqueries(sparkSession),
115-
EnsureRequirements(sparkSession.sessionState.conf),
116-
ReuseSubquery(sparkSession.sessionState.conf),
117-
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
118-
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will
119-
// only transform node in a sub-tree.
120-
PlanQueryStage(sparkSession.sessionState.conf))
121-
122109
protected def stringOrError[A](f: => A): String =
123110
try f.toString catch { case e: AnalysisException => e.toString }
124111

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
21-
import org.apache.spark.sql.execution.adaptive.QueryStageInput
2221
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
2322
import org.apache.spark.sql.execution.metric.SQLMetricInfo
2423
import org.apache.spark.sql.internal.SQLConf
@@ -53,7 +52,6 @@ private[execution] object SparkPlanInfo {
5352
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
5453
val children = plan match {
5554
case ReusedExchangeExec(_, child) => child :: Nil
56-
case i: QueryStageInput => i.childStage :: Nil
5755
case _ => plan.children ++ plan.subqueries
5856
}
5957
val metrics = plan.metrics.toSeq.map { case (key, metric) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala

Lines changed: 0 additions & 79 deletions
This file was deleted.

0 commit comments

Comments
 (0)