Skip to content

Conversation

@manuzhang
Copy link
Member

What changes were proposed in this pull request?

As suggested by @cloud-fan in #28916 (comment), apply CoalesceShufflePartitions with partitionSpecs of Nil when ShuffleQueryStageExec#mapStats is None.

Why are the changes needed?

For SQL like

SELECT b, COUNT(t1.a) as cnt
FROM t1
INNER JOIN t2
ON t1.id = t2.id
WHERE t1.id > 10
GROUP BY b

when all ids of t1 are smaller than 10. Many unnecessary tasks are launched for the final shuffle stage because CoalesceShufflePartitions is skipped when input RDD has 0 partitions.

Before

image

After

image

Does this PR introduce any user-facing change?

No

How was this patch tested?

Updated tests.

}
}

if (validMetrics.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a query stage has multiple leaf shuffles, and only one of them has 0-partition input RDD. What shall we do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's like coalescing one less shuffles and handled by the nonEmpty codes.

@cloud-fan
Copy link
Contributor

cc @maryannxue @JkSelf @viirya

}

if (validMetrics.isEmpty) {
updatePlan(Nil)
Copy link
Member

@viirya viirya Jun 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment for the case of 0-partition?

@SparkQA
Copy link

SparkQA commented Jun 30, 2020

Test build #124648 has finished for PR 28954 at commit 5bf6de5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jul 1, 2020

After more thoughts, maybe a better way is to add a new rule in AdaptiveSparkPlanExec.optimizer, which converts LogicalQueryStage to empty LocalRelation if the size is 0.

This is not really "coalesce partitions" and we'd better not do it in CoalesceShufflePartitions.

@JkSelf
Copy link
Contributor

JkSelf commented Jul 1, 2020

@manuzhang I run the test("Empty stage coalesced to 0-partition RDD") in AdaptiveQueryExecSuite. It seems there is no unnecessary tasks for the empty partitions. The Spark UI is as following. And can you give a simple examples to reproduce this issue? Thanks.
image

The related code is :

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
val df1 = spark.range(10).withColumn("a", 'id)
val df2 = spark.range(10).withColumn("b", 'id)
val result = df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count()
result.show()

@manuzhang
Copy link
Member Author

@JkSelf Try changing result.show() to result.collect()

@JkSelf
Copy link
Contributor

JkSelf commented Jul 1, 2020

@manuzhang It seems still only one stage and no unnecessary task for empty partitions.
image

related code:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
val df1 = spark.range(10).withColumn("a", 'id)
val df2 = spark.range(10).withColumn("b", 'id)
val result = df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count()
result.collect()

@manuzhang
Copy link
Member Author

@JkSelf Could you check the jobs tab ?

@JkSelf
Copy link
Contributor

JkSelf commented Jul 2, 2020

@manuzhang Here is the jobs tab.
image

@viirya
Copy link
Member

viirya commented Jul 2, 2020

When input RDD has 0 partitions, even CoalesceShufflePartitions is skipped, why we will launch unnecessary tasks?

@manuzhang
Copy link
Member Author

manuzhang commented Jul 3, 2020

@JkSelf @viirya
Here is the partial SQL UI of running the same example with default number of shuffle partitions. (the binary is built from master branch till June 29). You can see SortMergeJoin is followed by an Exchange of 200 partitions.

image

@manuzhang
Copy link
Member Author

@JkSelf
I get the same result as you when I set the numPartitions of source to 1 (By default, it's 16 on my Mac), i.e.

val df1 = spark.range(0, 10, 1, 1).withColumn("a", 'id)
val df2 = spark.range(0, 10, 1, 1).withColumn("b", 'id)
val testDf = df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count()
testDf.collect()

Compare the execution plan with above.
image

@manuzhang
Copy link
Member Author

@cloud-fan I find an issue with updating metrics if I convert a LogicalQueryStage to LocalRelation when its child's mapStats is empty.

java.util.NoSuchElementException: key not found: 514
	at scala.collection.immutable.Map$Map1.apply(Map.scala:114)
	at org.apache.spark.sql.execution.ui.SQLAppStatusListener.$anonfun$aggregateMetrics$11(SQLAppStatusListener.scala:257)

The missing key is from the metrics of a RangeExec sent to SQLAppStatusListener earlier

@manuzhang manuzhang closed this Aug 14, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants