Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ package org.apache.spark
* (may be inexact due to use of compressed map statuses)
*/
private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
extends Serializable
Original file line number Diff line number Diff line change
Expand Up @@ -282,19 +282,14 @@ object SQLConf {

val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.intConf
.checkValue(numPartitions => numPartitions > 0, "The minimum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(1)

val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution.")
.internal()
.doc("The advisory minimal number of post-shuffle partitions provided to " +
"ExchangeCoordinator. This setting is used in our test to make sure we " +
"have enough parallelism to expose issues that will not be exposed with a " +
"single partition. When the value is a non-positive value, this setting will " +
"not be provided to ExchangeCoordinator.")
.intConf
.checkValue(numPartitions => numPartitions > 0, "The maximum shuffle partition number " +
"must be a positive integer.")
.createWithDefault(500)
.createWithDefault(-1)

val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
Expand Down Expand Up @@ -1783,9 +1778,8 @@ class SQLConf extends Serializable with Logging {

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)

def maxNumPostShufflePartitions: Int = getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS)
def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)

def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.adaptive.PlanQueryStage
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -95,11 +95,7 @@ class QueryExecution(
* row format conversions as needed.
*/
protected def prepareForExecution(plan: SparkPlan): SparkPlan = {
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
adaptivePreparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
} else {
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp)}
}
preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
}

/** A sequence of rules that will be applied in order to the physical plan before execution. */
Expand All @@ -110,15 +106,6 @@ class QueryExecution(
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf))

protected def adaptivePreparations: Seq[Rule[SparkPlan]] = Seq(
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf),
// PlanQueryStage needs to be the last rule because it divides the plan into multiple sub-trees
// by inserting leaf node QueryStageInput. Transforming the plan after applying this rule will
// only transform node in a sub-tree.
PlanQueryStage(sparkSession.sessionState.conf))

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: AnalysisException => e.toString }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.adaptive.QueryStageInput
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -53,7 +52,6 @@ private[execution] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val children = plan match {
case ReusedExchangeExec(_, child) => child :: Nil
case i: QueryStageInput => i.childStage :: Nil
case _ => plan.children ++ plan.subqueries
}
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
Expand Down

This file was deleted.

Loading