Skip to content

Conversation

@xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Mar 30, 2020

What changes were proposed in this pull request?

This reverts commit 8cf76f8. #25962

Why are the changes needed?

In SPARK-29285, we change to create shuffle temporary eagerly. This is helpful for not to fail the entire task in the scenario of occasional disk failure. But for the applications that many tasks don't actually create shuffle files, it caused overhead. See the below benchmark:
Env: Spark local-cluster[2, 4, 19968], each queries run 5 round, each round 5 times.
Data: TPC-DS scale=99 generate by spark-tpcds-datagen
Results:

Base Revert
Q20 Vector(4.096865667, 2.76231748, 2.722007606, 2.514433591, 2.400373579) Median 2.722007606 Vector(3.763185446, 2.586498463, 2.593472842, 2.320522846, 2.224627274) Median 2.586498463
Q33 Vector(5.872176321, 4.854397586, 4.568787136, 4.393378146, 4.423996818) Median 4.568787136 Vector(5.38746785, 4.361236877, 4.082311276, 3.867206824, 3.783188024) Median 4.082311276
Q52 Vector(3.978870321, 3.225437871, 3.282411608, 2.869674887, 2.644490664) Median 3.225437871 Vector(4.000381522, 3.196025108, 3.248787619, 2.767444508, 2.606163423) Median 3.196025108
Q56 Vector(6.238045133, 4.820535173, 4.609965579, 4.313509894, 4.221256227) Median 4.609965579 Vector(6.241611339, 4.225592467, 4.195202502, 3.757085755, 3.657525982) Median 4.195202502

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing tests.

@xuanyuanking
Copy link
Member Author

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Mar 30, 2020

Test build #120596 has finished for PR 28072 at commit 124e6ce.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

Can you please update the description to say why you think this caused the regression.

Personally I would prefer to see a separate jira filed for reverting this since it has been in for a while and we put out to preview releases with it in. Put the details in there and link the jiras so that someone looking at these can figure out what's going on. I don't think there is official policy on this but if others know of one please let me know.

@dongjoon-hyun
Copy link
Member

Shall we remove the controversial Q32 result from PR description? Technically, the median value seems to be greater than the baseline due to the one outlier.

@dongjoon-hyun
Copy link
Member

I also agree with @tgravescs 's opinion.

Personally I would prefer to see a separate jira filed for reverting this

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. +1 for removing Q32, and filing a JIRA

@xuanyuanking xuanyuanking changed the title Revert [SPARK-29285][SHUFFLE] Temporary shuffle files should be able to handle disk failures [SPARK-31314][CORE] Revert SPARK-29285 to fix shuffle regression caused by creating temporary file eagerly Mar 31, 2020
@xuanyuanking
Copy link
Member Author

@tgravescs Thanks for the advice, a sperate Jira for this revert is necessary.
@dongjoon-hyun Thanks for checking, removed Q32 in the description.

@cloud-fan cloud-fan closed this in 07c5078 Mar 31, 2020
@cloud-fan
Copy link
Contributor

thanks, merging to master/3.0!

cloud-fan pushed a commit that referenced this pull request Mar 31, 2020
…ed by creating temporary file eagerly

### What changes were proposed in this pull request?
This reverts commit 8cf76f8. #25962

### Why are the changes needed?
In SPARK-29285, we change to create shuffle temporary eagerly. This is helpful for not to fail the entire task in the scenario of occasional disk failure. But for the applications that many tasks don't actually create shuffle files, it caused overhead. See the below benchmark:
Env: Spark local-cluster[2, 4, 19968], each queries run 5 round, each round 5 times.
Data: TPC-DS scale=99 generate by spark-tpcds-datagen
Results:
|     | Base                                                                                        | Revert                                                                                      |
|-----|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------|
| Q20 | Vector(4.096865667, 2.76231748, 2.722007606, 2.514433591, 2.400373579)  Median 2.722007606  | Vector(3.763185446, 2.586498463, 2.593472842, 2.320522846, 2.224627274)  Median 2.586498463 |
| Q33 | Vector(5.872176321, 4.854397586, 4.568787136, 4.393378146, 4.423996818)  Median 4.568787136 | Vector(5.38746785, 4.361236877, 4.082311276, 3.867206824, 3.783188024)  Median 4.082311276  |
| Q52 | Vector(3.978870321, 3.225437871, 3.282411608, 2.869674887, 2.644490664)  Median 3.225437871 | Vector(4.000381522, 3.196025108, 3.248787619, 2.767444508, 2.606163423)  Median 3.196025108 |
| Q56 | Vector(6.238045133, 4.820535173, 4.609965579, 4.313509894, 4.221256227)  Median 4.609965579 | Vector(6.241611339, 4.225592467, 4.195202502, 3.757085755, 3.657525982)  Median 4.195202502 |

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing tests.

Closes #28072 from xuanyuanking/SPARK-29285-revert.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 07c5078)
Signed-off-by: Wenchen Fan <[email protected]>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…ed by creating temporary file eagerly

### What changes were proposed in this pull request?
This reverts commit 8cf76f8. apache#25962

### Why are the changes needed?
In SPARK-29285, we change to create shuffle temporary eagerly. This is helpful for not to fail the entire task in the scenario of occasional disk failure. But for the applications that many tasks don't actually create shuffle files, it caused overhead. See the below benchmark:
Env: Spark local-cluster[2, 4, 19968], each queries run 5 round, each round 5 times.
Data: TPC-DS scale=99 generate by spark-tpcds-datagen
Results:
|     | Base                                                                                        | Revert                                                                                      |
|-----|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------|
| Q20 | Vector(4.096865667, 2.76231748, 2.722007606, 2.514433591, 2.400373579)  Median 2.722007606  | Vector(3.763185446, 2.586498463, 2.593472842, 2.320522846, 2.224627274)  Median 2.586498463 |
| Q33 | Vector(5.872176321, 4.854397586, 4.568787136, 4.393378146, 4.423996818)  Median 4.568787136 | Vector(5.38746785, 4.361236877, 4.082311276, 3.867206824, 3.783188024)  Median 4.082311276  |
| Q52 | Vector(3.978870321, 3.225437871, 3.282411608, 2.869674887, 2.644490664)  Median 3.225437871 | Vector(4.000381522, 3.196025108, 3.248787619, 2.767444508, 2.606163423)  Median 3.196025108 |
| Q56 | Vector(6.238045133, 4.820535173, 4.609965579, 4.313509894, 4.221256227)  Median 4.609965579 | Vector(6.241611339, 4.225592467, 4.195202502, 3.757085755, 3.657525982)  Median 4.195202502 |

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing tests.

Closes apache#28072 from xuanyuanking/SPARK-29285-revert.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@zhuqi-lucas
Copy link
Contributor

cc @xuanyuanking @cloud-fan @Ngone51 @tgravescs @dongjoon-hyun

Since this has been reverted, i meet the disk failure in our production clusters, how can we handle the disk failed problem without this.

There are many disks in yarn clusters, but if one disk failure happend, we just retry the task, if we can avoid retry to the same failed disk in one node? Or if spark has some disk blacklist solution now?

And reverted solution causes that applications with many tasks don't actually create shuffle files, it caused overhead, if we can get a workaround solution to avoid create when tasks don't need temp shuffle files, i still think we should handle this.

The logs are:

DAGScheduler: ShuffleMapStage 521 (insertInto at Tools.scala:147) failed in 4.995 s due to Job aborted due to stage failure: Task 30 in stage 521.0 failed 4 times, most recent failure: Lost task 30.3 in stage 521.0 (TID 127941, ********** 91): java.io.FileNotFoundException: /data2/yarn/local/usercache/aa/appcache/*****/blockmgr-eb5ca215-a7af-41be-87ee-89fd7e3b1de5/0e/temp_shuffle_45279ef1-5143-4632-9df0-d7ee1f50c026 (Input/output error)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)
at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Thanks.

@Ngone51
Copy link
Member

Ngone51 commented Aug 6, 2021

@zhuqi-lucas
Spark has the exclusion (a.k.a blacklisting) feature (see spark.excludeOnFailure.enabled) and Spark on YARN also has its own exclusion feature (see spark.yarn.executor.launch.excludeOnFailure.enabled) to ban such problematic nodes. You can check the configuration here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants