Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
// `ShuffleQueryStageExec` gives null mapOutputStatistics when the input RDD has 0 partitions,
// we should skip it when calculating the `partitionStartIndices`.
val validMetrics = shuffleMetrics.filter(_ != null)
if (validMetrics.nonEmpty) {
// We may have different pre-shuffle partition numbers, don't reduce shuffle partition number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also give an example about when we will have different pre-shuffle partition numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, added. Please let me know if it should be more detailed.

// in that case. For example when we union fully aggregated data (data is arranged to a single
// partition) and a result of a SortMergeJoin (multiple partitions).
val distinctNumPreShufflePartitions =
validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
Copy link
Member

@viirya viirya Aug 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we have this condition distinctNumPreShufflePartitions.length == 1, do we still need the assert at L136? Shall we remove the assert?

Copy link
Contributor Author

@peter-toth peter-toth Aug 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we could remove it, but the assert has been there since the original version of ReduceNumShufflePartitions where the distinctNumPreShufflePartitions.length == 1 check was also included. I'm not sure what is the plan with ReduceNumShufflePartitions. @carsonwang, @maryannxue do you want to improve Union/SinglePartition handling in this rule? Shall we remove the assert?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine to remove it. We can improve the handling of Union/SinglePartition in future and it probably needs more changes and a new function to estimate the partition start indices.

val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray)
// This transformation adds new nodes, so we must use `transformUp` here.
plan.transformUp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,4 +587,22 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA
}
withSparkSession(test, 200, None)
}

test("Union two datasets with different pre-shuffle partition number") {
val test: SparkSession => Unit = { spark: SparkSession =>
val df1 = spark.range(3).join(spark.range(3), "id").toDF()
val df2 = spark.range(3).groupBy().sum()

val resultDf = df1.union(df2)

checkAnswer(resultDf, Seq((0), (1), (2), (3)).map(i => Row(i)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this fail without the fix?

Copy link
Contributor Author

@peter-toth peter-toth Aug 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does. The plan is:

AdaptiveSparkPlan(isFinalPlan=false)
+- Union
   :- Project [id#0L]
   :  +- SortMergeJoin [id#0L], [id#2L], Inner
   :     :- Sort [id#0L ASC NULLS FIRST], false, 0
   :     :  +- Exchange hashpartitioning(id#0L, 5), true
   :     :     +- Range (0, 3, step=1, splits=12)
   :     +- Sort [id#2L ASC NULLS FIRST], false, 0
   :        +- Exchange hashpartitioning(id#2L, 5), true
   :           +- Range (0, 3, step=1, splits=12)
   +- HashAggregate(keys=[], functions=[sum(id#6L)], output=[sum(id)#10L])
      +- Exchange SinglePartition, true
         +- HashAggregate(keys=[], functions=[partial_sum(id#6L)], output=[sum#14L])
            +- Range (0, 3, step=1, splits=12)

and the error comes from this assert: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala#L136

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fill the Does this PR introduce any user-facing change section? Changing a query from failure to runnable is a user-facing change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, sure, filled.


val finalPlan = resultDf.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
// As the pre-shuffle partition number are different, we will skip reducing
// the shuffle partition numbers.
assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 0)
}
withSparkSession(test, 100, None)
}
}