Skip to content

Commit ceeaa75

Browse files
delta003azarum
authored andcommitted
Revert "[AE2.3-02][SPARK-23128] Add QueryStage and the framework for adaptive execution (auto setting the number of reducer)" (palantir#514)
This reverts https://github.com/palantir/spark/pull/443/files#diff-3cd46a3f60c5352282bd3f2c9efff7fc fysa @justinuang This reverts commit 8a4a29b, reversing changes made to 8bfddaf. (Please fill in changes proposed in this fix) (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request.
1 parent 39513a4 commit ceeaa75

File tree

18 files changed

+410
-756
lines changed

18 files changed

+410
-756
lines changed

core/src/main/scala/org/apache/spark/MapOutputStatistics.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,3 @@ package org.apache.spark
2525
* (may be inexact due to use of compressed map statuses)
2626
*/
2727
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
28-
extends Serializable

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -280,19 +280,14 @@ object SQLConf {
280280

281281
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
282282
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
283-
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
284-
.intConf
285-
.checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " +
286-
"must be a positive integer.")
287-
.createWithDefault(1)
288-
289-
val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
290-
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
291-
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.")
283+
.internal()
284+
.doc("The advisory minimal number of post-shuffle partitions provided to " +
285+
"ExchangeCoordinator. This setting is used in our test to make sure we " +
286+
"have enough parallelism to expose issues that will not be exposed with a " +
287+
"single partition. When the value is a non-positive value, this setting will " +
288+
"not be provided to ExchangeCoordinator.")
292289
.intConf
293-
.checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " +
294-
"must be a positive integer.")
295-
.createWithDefault(500)
290+
.createWithDefault(-1)
296291

297292
val SUBEXPRESSION_ELIMINATION_ENABLED =
298293
buildConf("spark.sql.subexpressionElimination.enabled")
@@ -1740,9 +1735,8 @@ class SQLConf extends Serializable with Logging {
17401735

17411736
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
17421737

1743-
def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
1744-
1745-
def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS)
1738+
def minNumPostShufflePartitions: Int =
1739+
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
17461740

17471741
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
17481742

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
3232
import org.apache.spark.sql.catalyst.rules.Rule
3333
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3434
import org.apache.spark.sql.catalyst.util.truncatedString
35-
import org.apache.spark.sql.execution.adaptive.PlanQueryStage
3635
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
3736
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
3837
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
@@ -97,11 +96,7 @@ class QueryExecution(
9796
* row format conversions as needed.
9897
*/
9998
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
100-
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
101-
adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
102-
} else {
103-
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
104-
}
99+
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
105100
}
106101

107102
/** A sequence of rules that will be applied in order to the physical plan before execution. */
@@ -112,15 +107,6 @@ class QueryExecution(
112107
ReuseExchange(sparkSession.sessionState.conf),
113108
ReuseSubquery(sparkSession.sessionState.conf))
114109

115-
protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
116-
PlanSubqueries(sparkSession),
117-
EnsureRequirements(sparkSession.sessionState.conf),
118-
ReuseSubquery(sparkSession.sessionState.conf),
119-
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
120-
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will
121-
// only transform node in a sub-tree.
122-
PlanQueryStage(sparkSession.sessionState.conf))
123-
124110
protected def stringOrError[A](f: => A): String =
125111
try f.toString catch { case e: AnalysisException => e.toString }
126112

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
21-
import org.apache.spark.sql.execution.adaptive.QueryStageInput
2221
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
2322
import org.apache.spark.sql.execution.metric.SQLMetricInfo
2423

@@ -52,7 +51,6 @@ private[execution] object SparkPlanInfo {
5251
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
5352
val children = plan match {
5453
case ReusedExchangeExec(_, child) => child :: Nil
55-
case i: QueryStageInput => i.childStage :: Nil
5654
case _ => plan.children ++ plan.subqueries
5755
}
5856
val metrics = plan.metrics.toSeq.map { case (key, metric) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanQueryStage.scala

Lines changed: 0 additions & 79 deletions
This file was deleted.

0 commit comments

Comments
 (0)