Skip to content

Conversation

@manuzhang
Copy link
Member

@manuzhang manuzhang commented Jun 24, 2020

What changes were proposed in this pull request?

This PR creates one partition spec in ShufflePartitionsUtil if all inputs are empty, which avoids launching as many unnecessary tasks as the number of shuffle partitions for following stages.

Why are the changes needed?

For SQL like

SELECT b, COUNT(t1.a) as cnt
FROM t1
INNER JOIN t2
ON t1.id = t2.id
WHERE t1.id > 10
GROUP BY b

when all ids of t1 are smaller than 10. No tasks are launched for join since empty input is coalesced to 0 partition. However, many unnecessary tasks could be launched for the following aggregate execution. Hence, I'm proposing coalescing to one partition when all partitions are empty.

Before

image

After

image

Does this PR introduce any user-facing change?

No.

How was this patch tested?

updated tests.

@manuzhang
Copy link
Member Author

cc @cloud-fan @maryannxue @JkSelf

def createPartitionSpec(last: Boolean = false): Unit = {
// Skip empty inputs, as it is a waste to launch an empty task
// unless all inputs are empty
if (coalescedSize > 0 || (last && partitionSpecs.isEmpty)) {
Copy link
Contributor

@cloud-fan cloud-fan Jun 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you want to create at least one partition? This doesn't match the PR description.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, one partition if all partitions are empty. This creates one partition spec at last when no partition specs have been created.

@SparkQA
Copy link

SparkQA commented Jun 24, 2020

Test build #124473 has finished for PR 28916 at commit 84031c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

which avoids launching as many unnecessary tasks as spark.sql.adaptive.coalescePartitions.initialPartitionNum

Why no partition would cause this?

@manuzhang
Copy link
Member Author

IIUC, stages after coalescing will be submitted in a separate job with default number of partitions when the input is 0
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L68

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Jun 28, 2020

Test build #124588 has finished for PR 28916 at commit 84031c1.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please

@viirya
Copy link
Member

viirya commented Jun 28, 2020

Same question. When partitionSpecs is empty, I think CustomShuffleReaderExec produces a RDD with empty partition. Why many unnecessary tasks?

@SparkQA
Copy link

SparkQA commented Jun 29, 2020

Test build #124607 has finished for PR 28916 at commit 84031c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@manuzhang
Copy link
Member Author

manuzhang commented Jun 29, 2020

@cloud-fan
Copy link
Contributor

Ideally we should launch no task for empty partitions. Launching one task is still not the best solution.

@viirya
Copy link
Member

viirya commented Jun 29, 2020

It's because ShuffleRowedRDD is created with default number of shuffle partitions here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L98

When partitionSpecs is empty, CustomShuffleReaderExec creates ShuffledRowRDD with empty partitionSpecs.

new ShuffledRowRDD(
stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)

override def getPartitions: Array[Partition] = {
Array.tabulate[Partition](partitionSpecs.length) { i =>
ShuffledRowRDDPartition(i, partitionSpecs(i))
}
}

The shuffle is changed by AQE and CustomShuffleReaderExec will replace original ShuffleExchangeExec, I think the code you pointed is replaced by above code path. So it should be empty partitions.

@cloud-fan
Copy link
Contributor

@manuzhang can you check the Spark web UI and make sure AQE does launch tasks for empty partitions?

@manuzhang
Copy link
Member Author

@cloud-fan Yes, also from the UT log before this change (I enabled the lineage log)

===== TEST OUTPUT FOR o.a.s.sql.execution.adaptive.AdaptiveQueryExecSuite: 'Empty stage coalesced to 0-partition RDD' =====
...
22:06:00.535 ScalaTest-run-running-AdaptiveQueryExecSuite INFO ShufflePartitionsUtil: For shuffle(0, 1), advisory target size: 67108864, actual target size 16.
22:06:00.683 ScalaTest-run-running-AdaptiveQueryExecSuite INFO CodeGenerator: Code generated in 88.071331 ms
22:06:00.700 ScalaTest-run-running-AdaptiveQueryExecSuite INFO CodeGenerator: Code generated in 11.333313 ms
22:06:00.752 ScalaTest-run-running-AdaptiveQueryExecSuite INFO CodeGenerator: Code generated in 9.213245 ms
22:06:00.801 ScalaTest-run-running-AdaptiveQueryExecSuite INFO CodeGenerator: Code generated in 12.257591 ms
22:06:00.855 ScalaTest-run-running-AdaptiveQueryExecSuite INFO SparkContext: Starting job: apply at OutcomeOf.scala:85
22:06:00.858 ScalaTest-run-running-AdaptiveQueryExecSuite INFO SparkContext: RDD's recursive dependencies:
(5) MapPartitionsRDD[43] at apply at OutcomeOf.scala:85 []
 |  SQLExecutionRDD[42] at apply at OutcomeOf.scala:85 []
 |  MapPartitionsRDD[41] at apply at OutcomeOf.scala:85 []
 |  MapPartitionsRDD[40] at apply at OutcomeOf.scala:85 []
 |  ShuffledRowRDD[39] at apply at OutcomeOf.scala:85 []
 +-(0) MapPartitionsRDD[38] at apply at OutcomeOf.scala:85 []
    |  MapPartitionsRDD[37] at apply at OutcomeOf.scala:85 []
    |  ZippedPartitionsRDD2[36] at apply at OutcomeOf.scala:85 []
    |  MapPartitionsRDD[33] at apply at OutcomeOf.scala:85 []
    |  ShuffledRowRDD[32] at apply at OutcomeOf.scala:85 []
    +-(2) MapPartitionsRDD[27] at apply at OutcomeOf.scala:85 []
       |  MapPartitionsRDD[26] at apply at OutcomeOf.scala:85 []
       |  MapPartitionsRDD[25] at apply at OutcomeOf.scala:85 []
       |  ParallelCollectionRDD[24] at apply at OutcomeOf.scala:85 []
    |  MapPartitionsRDD[35] at apply at OutcomeOf.scala:85 []
    |  ShuffledRowRDD[34] at apply at OutcomeOf.scala:85 []
    +-(2) MapPartitionsRDD[31] at apply at OutcomeOf.scala:85 []
       |  MapPartitionsRDD[30] at apply at OutcomeOf.scala:85 []
       |  MapPartitionsRDD[29] at apply at OutcomeOf.scala:85 []
       |  ParallelCollectionRDD[28] at apply at OutcomeOf.scala:85 []
22:06:00.860 dag-scheduler-event-loop INFO DAGScheduler: Registering RDD 38 (apply at OutcomeOf.scala:85) as input to shuffle 2
22:06:00.861 dag-scheduler-event-loop INFO DAGScheduler: Got job 2 (apply at OutcomeOf.scala:85) with 5 output partitions
22:06:00.861 dag-scheduler-event-loop INFO DAGScheduler: Final stage: ResultStage 5 (apply at OutcomeOf.scala:85)
22:06:00.861 dag-scheduler-event-loop INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 4)
22:06:00.862 dag-scheduler-event-loop INFO DAGScheduler: Missing parents: List()
22:06:00.863 dag-scheduler-event-loop INFO DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[43] at apply at OutcomeOf.scala:85), which has no missing parents

@cloud-fan
Copy link
Contributor

@manuzhang can you check the web UI as well?

@manuzhang
Copy link
Member Author

@cloud-fan here it is
image

@cloud-fan
Copy link
Contributor

I checked the related code and came up with the same conclusion as @viirya . Can you elaborate more about how this happens?

@manuzhang
Copy link
Member Author

The above log is from this UT.

df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count()

I was not saying that tasks were launched for the stage of coalesced empty partition but the stage consuming the output of the empty partition, which I believe is the execution of groupBy('a).count() part.

@manuzhang
Copy link
Member Author

@viirya @cloud-fan
I've updated the PR description with an example. This is more of an improvement I propose for certain cases. Please let me know whether it makes sense.

@cloud-fan
Copy link
Contributor

I think the key problem is we skip CoalesceShufflePartitions when ShuffleQueryStageExec#mapStats is None. This can happen when the input RDD of the shuffle has 0 partitions. I think we should still apply CoalesceShufflePartitions in this case and wrap ShuffleQueryStageExec with CustomShuffleReaderExec with partitionSpecs as Nil.

@manuzhang
Copy link
Member Author

Thanks for pointing that out. Let me try with a new PR.

@manuzhang
Copy link
Member Author

@cloud-fan @viirya Please help review the new PR #28954.

@manuzhang manuzhang closed this Jun 30, 2020
cloud-fan added a commit that referenced this pull request Jul 31, 2020
### What changes were proposed in this pull request?

This PR updates the AQE framework to at least return one partition during coalescing.

This PR also updates `ShuffleExchangeExec.canChangeNumPartitions` to not coalesce for `SinglePartition`.

### Why are the changes needed?

It's a bit risky to return 0 partitions, as sometimes it's different from empty data. For example, global aggregate will return one result row even if the input table is empty. If there is 0 partition, no task will be run and no result will be returned. More specifically, the global aggregate requires `AllTuples` and we can't coalesce to 0 partitions.

This is not a real bug for now. The global aggregate will be planned as partial and final physical agg nodes. The partial agg will return at least one row, so that the shuffle still have data. But it's better to fix this issue to avoid potential bugs in the future.

According to #28916, this change also fix some perf problems.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated test.

Closes #29307 from cloud-fan/aqe.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan added a commit to cloud-fan/spark that referenced this pull request Jul 31, 2020
This PR updates the AQE framework to at least return one partition during coalescing.

This PR also updates `ShuffleExchangeExec.canChangeNumPartitions` to not coalesce for `SinglePartition`.

It's a bit risky to return 0 partitions, as sometimes it's different from empty data. For example, global aggregate will return one result row even if the input table is empty. If there is 0 partition, no task will be run and no result will be returned. More specifically, the global aggregate requires `AllTuples` and we can't coalesce to 0 partitions.

This is not a real bug for now. The global aggregate will be planned as partial and final physical agg nodes. The partial agg will return at least one row, so that the shuffle still have data. But it's better to fix this issue to avoid potential bugs in the future.

According to apache#28916, this change also fix some perf problems.

no

updated test.

Closes apache#29307 from cloud-fan/aqe.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
a0x8o added a commit to a0x8o/spark that referenced this pull request Jul 31, 2020
### What changes were proposed in this pull request?

This PR updates the AQE framework to at least return one partition during coalescing.

This PR also updates `ShuffleExchangeExec.canChangeNumPartitions` to not coalesce for `SinglePartition`.

### Why are the changes needed?

It's a bit risky to return 0 partitions, as sometimes it's different from empty data. For example, global aggregate will return one result row even if the input table is empty. If there is 0 partition, no task will be run and no result will be returned. More specifically, the global aggregate requires `AllTuples` and we can't coalesce to 0 partitions.

This is not a real bug for now. The global aggregate will be planned as partial and final physical agg nodes. The partial agg will return at least one row, so that the shuffle still have data. But it's better to fix this issue to avoid potential bugs in the future.

According to apache/spark#28916, this change also fix some perf problems.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated test.

Closes #29307 from cloud-fan/aqe.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan added a commit to cloud-fan/spark that referenced this pull request Aug 3, 2020
This PR updates the AQE framework to at least return one partition during coalescing.

This PR also updates `ShuffleExchangeExec.canChangeNumPartitions` to not coalesce for `SinglePartition`.

It's a bit risky to return 0 partitions, as sometimes it's different from empty data. For example, global aggregate will return one result row even if the input table is empty. If there is 0 partition, no task will be run and no result will be returned. More specifically, the global aggregate requires `AllTuples` and we can't coalesce to 0 partitions.

This is not a real bug for now. The global aggregate will be planned as partial and final physical agg nodes. The partial agg will return at least one row, so that the shuffle still have data. But it's better to fix this issue to avoid potential bugs in the future.

According to apache#28916, this change also fix some perf problems.

no

updated test.

Closes apache#29307 from cloud-fan/aqe.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants