Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
ShuffleExchangeExec(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
ShuffleExchangeExec(RoundRobinPartitioning(numPartitions),
planLater(child), canChangeNumPartitions = false) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
Expand Down Expand Up @@ -736,7 +737,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r: logical.Range =>
execution.RangeExec(r) :: Nil
case r: logical.RepartitionByExpression =>
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child)) :: Nil
exchange.ShuffleExchangeExec(
r.partitioning, planLater(r.child), canChangeNumPartitions = false) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,18 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
// If not all leaf nodes are query stages, it's not safe to reduce the number of
// shuffle partitions, because we may break the assumption that all children of a spark plan
// have same number of output partitions.
return plan
}

val shuffleStages = plan.collect {
case stage: ShuffleQueryStageExec => stage
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
}
// ShuffleExchanges introduced by repartition do not support changing the number of partitions.
// We change the number of partitions in the stage only if all the ShuffleExchanges support it.
if (!shuffleStages.forall(_.plan.canChangeNumPartitions)) {
plan
} else {
val shuffleStages = plan.collect {
case stage: ShuffleQueryStageExec => stage
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
}
val shuffleMetrics = shuffleStages.map { stage =>
val metricsFuture = stage.mapOutputStatisticsFuture
assert(metricsFuture.isCompleted, "ShuffleQueryStageExec should already be ready")
Expand All @@ -76,12 +82,7 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
// `ShuffleQueryStageExec` gives null mapOutputStatistics when the input RDD has 0 partitions,
// we should skip it when calculating the `partitionStartIndices`.
val validMetrics = shuffleMetrics.filter(_ != null)
// We may get different pre-shuffle partition number if user calls repartition manually.
// We don't reduce shuffle partition number in that case.
val distinctNumPreShufflePartitions =
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct

if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
if (validMetrics.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carsonwang is it safe to remove distinctNumPreShufflePartitions.length == 1 from here? I think @hvanhovell's comment (https://github.com/apache/spark/pull/24978/files#r299396944) still applies here about Union. I run into an issue with my plan:

Union
:- Project [id_key#236, true AS row_type#249, link#232]
:  +- Filter (isnotnull(min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#246) AND (id_key#236 = min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#246))
:     +- Window [min(id_key#236) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#246]
:        +- ShuffleQueryStage 5
:           +- Exchange SinglePartition, true
:              +- *(7) Project [id_key#236, link#232]
:                 +- *(7) SortMergeJoin [link#237], [link#232], Inner, (id_key#236 > id_key#230)
:                    :- *(5) Sort [link#237 ASC NULLS FIRST], false, 0
:                    :  +- CoalescedShuffleReader [0]
:                    :     +- ShuffleQueryStage 0
:                    :        +- Exchange hashpartitioning(link#237, 5), true
:                    :           +- *(1) Project [col1#224 AS id_key#236, col2#225 AS link#237]
:                    :              +- *(1) LocalTableScan [col1#224, col2#225]
:                    +- *(6) Sort [link#232 ASC NULLS FIRST], false, 0
:                       +- CoalescedShuffleReader [0]
:                          +- ShuffleQueryStage 1
:                             +- Exchange hashpartitioning(link#232, 5), true
:                                +- *(2) Project [id_key#230, link#232]
:                                   +- *(2) Filter (isnotnull(link#232) AND isnotnull(id_key#230))
:                                      +- *(2) Scan RecursiveReference iter[id_key#230,row_type#231,link#232]
+- Project [id_key#240, new AS new#256, link#241]
   +- SortMergeJoin [id_key#238], [id_key#240], Inner
      :- Sort [id_key#238 ASC NULLS FIRST], false, 0
      :  +- ShuffleQueryStage 4
      :     +- Exchange hashpartitioning(id_key#238, 5), true
      :        +- *(4) Project [id_key#238]
      :           +- *(4) Filter (isnotnull(min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#247) AND (id_key#238 = min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#247))
      :              +- Window [min(id_key#238) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id_key) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#247]
      :                 +- CoalescedShuffleReader [0]
      :                    +- ShuffleQueryStage 2
      :                       +- Exchange SinglePartition, true
      :                          +- LocalTableScan <empty>, [id_key#238]
      +- Sort [id_key#240 ASC NULLS FIRST], false, 0
         +- ShuffleQueryStage 3
            +- Exchange hashpartitioning(id_key#240, 5), true
               +- *(3) Project [col1#228 AS id_key#240, col2#229 AS link#241]
                  +- *(3) LocalTableScan [col1#228, col2#229]

where ShuffleQueryStage 5 conflicts with ShuffleQueryStage 4 and ShuffleQueryStage 3.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah SinglePartition is an exception. So it's still possible to hit distinctNumPreShufflePartitions.length > 1 here. Let's add back this check @carsonwang @maryannxue

thanks for reporting it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened a small PR here: #25479 as a follow-up, please let me know if this requires a new ticket.

val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray)
// This transformation adds new nodes, so we must use `transformUp` here.
plan.transformUp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
val defaultPartitioning = distribution.createPartitioning(targetNumPartitions)
child match {
// If child is an exchange, we replace it with a new one having defaultPartitioning.
case ShuffleExchangeExec(_, c) => ShuffleExchangeExec(defaultPartitioning, c)
case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c)
case _ => ShuffleExchangeExec(defaultPartitioning, child)
}
}
Expand Down Expand Up @@ -191,7 +191,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {

def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
// TODO: remove this after we create a physical operator for `RepartitionByExpression`.
case operator @ ShuffleExchangeExec(upper: HashPartitioning, child) =>
case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
child.outputPartitioning match {
case lower: HashPartitioning if upper.semanticEquals(lower) => child
case _ => operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordCo
*/
case class ShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan) extends Exchange {
child: SparkPlan,
canChangeNumPartitions: Boolean = true) extends Exchange {

// NOTE: coordinator can be null after serialization/deserialization,
// e.g. it can be null on the Executor side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
val agg = cp.groupBy('id % 2).agg(count('id))

agg.queryExecution.executedPlan.collectFirst {
case ShuffleExchangeExec(_, _: RDDScanExec) =>
case ShuffleExchangeExec(_, _: RDDScanExec, _) =>
case BroadcastExchangeExec(_, _: RDDScanExec) =>
}.foreach { _ =>
fail(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,22 +574,17 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
withSparkSession(test, 4, None)
}

test("Union two datasets with different pre-shuffle partition number") {
test("Do not reduce the number of shuffle partition for repartition") {
val test: SparkSession => Unit = { spark: SparkSession =>
val dataset1 = spark.range(3)
val dataset2 = spark.range(3)

val resultDf = dataset1.repartition(2, dataset1.col("id"))
.union(dataset2.repartition(3, dataset2.col("id"))).toDF()
val ds = spark.range(3)
val resultDf = ds.repartition(2, ds.col("id")).toDF()

checkAnswer(resultDf,
Seq((0), (0), (1), (1), (2), (2)).map(i => Row(i)))
Seq(0, 1, 2).map(i => Row(i)))
val finalPlan = resultDf.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
// As the pre-shuffle partition number are different, we will skip reducing
// the shuffle partition numbers.
assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 0)
}
withSparkSession(test, 100, None)
withSparkSession(test, 200, None)
}
}