Skip to content

Commit 5f8f4ed

Browse files
committed
better naming
1 parent 5383910 commit 5f8f4ed

File tree

4 files changed

+30
-32
lines changed

4 files changed

+30
-32
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.streaming.{InternalOutputModes, StreamingRe
3232
import org.apache.spark.sql.execution.aggregate.AggUtils
3333
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
3434
import org.apache.spark.sql.execution.command._
35-
import org.apache.spark.sql.execution.exchange.{PartitioningFlexibility, ShuffleExchangeExec}
35+
import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleOrigin}
3636
import org.apache.spark.sql.execution.python._
3737
import org.apache.spark.sql.execution.streaming._
3838
import org.apache.spark.sql.execution.streaming.sources.MemoryPlan
@@ -670,7 +670,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
670670
case logical.Repartition(numPartitions, shuffle, child) =>
671671
if (shuffle) {
672672
ShuffleExchangeExec(RoundRobinPartitioning(numPartitions),
673-
planLater(child), PartitioningFlexibility.STRICT) :: Nil
673+
planLater(child), ShuffleOrigin.REPARTITION_WITH_NUM) :: Nil
674674
} else {
675675
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
676676
}
@@ -703,15 +703,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
703703
case r: logical.Range =>
704704
execution.RangeExec(r) :: Nil
705705
case r: logical.RepartitionByExpression =>
706-
val partitionFlexibility = if (r.optNumPartitions.isEmpty) {
707-
PartitioningFlexibility.PRESERVE_CLUSTERING
706+
val shuffleOrigin = if (r.optNumPartitions.isEmpty) {
707+
ShuffleOrigin.REPARTITION
708708
} else {
709-
PartitioningFlexibility.STRICT
709+
ShuffleOrigin.REPARTITION_WITH_NUM
710710
}
711-
exchange.ShuffleExchangeExec(
712-
r.partitioning,
713-
planLater(r.child),
714-
partitionFlexibility) :: Nil
711+
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
715712
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
716713
case r: LogicalRDD =>
717714
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ object OptimizeLocalShuffleReader extends Rule[SparkPlan] {
136136

137137
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
138138
case s: ShuffleQueryStageExec =>
139-
s.shuffle.canChangeClustering && s.mapStats.isDefined
139+
s.shuffle.canChangePartitioning && s.mapStats.isDefined
140140
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
141-
s.shuffle.canChangeClustering && s.mapStats.isDefined && partitionSpecs.nonEmpty
141+
s.shuffle.canChangePartitioning && s.mapStats.isDefined && partitionSpecs.nonEmpty
142142
case _ => false
143143
}
144144
}

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,26 +56,24 @@ trait ShuffleExchangeLike extends Exchange {
5656
*/
5757
def numPartitions: Int
5858

59-
def partitioningFlexibility: PartitioningFlexibility.Value
59+
def shuffleOrigin: ShuffleOrigin.Value
6060

6161
/**
6262
* Returns whether the shuffle partition number can be changed.
6363
*/
6464
final def canChangeNumPartitions: Boolean = {
6565
// If users specify the num partitions via APIs like `repartition(5, col)`, we shouldn't change
6666
// it. For `SinglePartition`, it requires exactly one partition and we can't change it either.
67-
partitioningFlexibility != PartitioningFlexibility.STRICT &&
68-
outputPartitioning != SinglePartition
67+
shuffleOrigin != ShuffleOrigin.REPARTITION_WITH_NUM && outputPartitioning != SinglePartition
6968
}
7069

7170
/**
72-
* Returns whether the shuffle output clustering can be changed.
71+
* Returns whether the shuffle output data partitioning can be changed.
7372
*/
74-
final def canChangeClustering: Boolean = {
73+
final def canChangePartitioning: Boolean = {
7574
// If users specify the partitioning via APIs like `repartition(col)`, we shouldn't change it.
7675
// For `SinglePartition`, itself is a special partitioning and we can't change it either.
77-
partitioningFlexibility == PartitioningFlexibility.UNSPECIFIED &&
78-
outputPartitioning != SinglePartition
76+
shuffleOrigin == ShuffleOrigin.ENSURE_REQUIREMENTS && outputPartitioning != SinglePartition
7977
}
8078

8179
/**
@@ -94,16 +92,19 @@ trait ShuffleExchangeLike extends Exchange {
9492
def runtimeStatistics: Statistics
9593
}
9694

97-
object PartitioningFlexibility extends Enumeration {
98-
type PartitioningFlexibility = Value
99-
// STRICT means we can't change the partitioning at all, including the partition number, even if
100-
// we lose performance improvement opportunity.
101-
val STRICT = Value
102-
// PRESERVE_CLUSTERING means we must preserve the data clustering even if it's useless to the
103-
// downstream operators. Shuffle partition number can be changed.
104-
val PRESERVE_CLUSTERING = Value
105-
// UNSPECIFIED means the partitioning can be changed as long as it doesn't break query semantic.
106-
val UNSPECIFIED = Value
95+
// Describes where the shuffle operator comes from.
96+
object ShuffleOrigin extends Enumeration {
97+
type ShuffleOrigin = Value
98+
// Indicates that the shuffle operator was added by the internal `EnsureRequirements` rule. It
99+
// means that the shuffle operator is used to ensure internal data partitioning requirements and
100+
// Spark is free to optimize it as long as the requirements are still ensured.
101+
val ENSURE_REQUIREMENTS = Value
102+
// Indicates that the shuffle operator was added by the user-specified repartition operator. Spark
103+
// can still optimize it via changing shuffle partition number, as data partitioning won't change.
104+
val REPARTITION = Value
105+
// Indicates that the shuffle operator was added by the user-specified repartition operator with
106+
// a certain partition number. Spark can't optimize it.
107+
val REPARTITION_WITH_NUM = Value
107108
}
108109

109110
/**
@@ -112,7 +113,7 @@ object PartitioningFlexibility extends Enumeration {
112113
case class ShuffleExchangeExec(
113114
override val outputPartitioning: Partitioning,
114115
child: SparkPlan,
115-
partitioningFlexibility: PartitioningFlexibility.Value = PartitioningFlexibility.UNSPECIFIED)
116+
shuffleOrigin: ShuffleOrigin.Value = ShuffleOrigin.ENSURE_REQUIREMENTS)
116117
extends ShuffleExchangeLike {
117118

118119
private lazy val writeMetrics =

sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
3333
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
3434
import org.apache.spark.sql.execution._
3535
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
36-
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, PartitioningFlexibility, ShuffleExchangeExec, ShuffleExchangeLike}
36+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
3737
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
3838
import org.apache.spark.sql.internal.SQLConf
3939
import org.apache.spark.sql.internal.SQLConf.COLUMN_BATCH_SIZE
@@ -766,8 +766,8 @@ case class PreRuleReplaceAddWithBrokenVersion() extends Rule[SparkPlan] {
766766
case class MyShuffleExchangeExec(delegate: ShuffleExchangeExec) extends ShuffleExchangeLike {
767767
override def numMappers: Int = delegate.numMappers
768768
override def numPartitions: Int = delegate.numPartitions
769-
override def partitioningFlexibility: PartitioningFlexibility.Value = {
770-
delegate.partitioningFlexibility
769+
override def shuffleOrigin: ShuffleOrigin.Value = {
770+
delegate.shuffleOrigin
771771
}
772772
override def mapOutputStatisticsFuture: Future[MapOutputStatistics] =
773773
delegate.mapOutputStatisticsFuture

0 commit comments

Comments
 (0)