Skip to content

Conversation

@yaooqinn
Copy link
Member

What changes were proposed in this pull request?

The getFile method in DiskBlockManager may return a file with an existing subdirectory. But when a disk failure occurs on that subdirectory. this file is inaccessible.
Then the FileNotFoundException like the following usually tear down the entire task, which is a bit heavy.

java.io.FileNotFoundException: /mnt/dfs/4/yarn/local/usercache/da_haitao/appcache/application_1568691584183_1953115/blockmgr-cc4689f5-eddd-4b99-8af4-4166a86ec30b/10/temp_shuffle_79be5049-d1d5-4a81-8e67-4ef236d3834f (No such file or directory)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(FileOutputStream.java:270)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:249)
	at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:209)
	at org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:416)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:230)
	at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

This change pre-touch the temporary file to check whether the parent directory is available or not. If NOT, we may try another possibly heathy disk util we reach the max attempts.

Why are the changes needed?

Re-running the whole task is much heavier than pick another heathy disk to output the temporary results.

Does this PR introduce any user-facing change?

NO

How was this patch tested?

ADD UT

@SparkQA
Copy link

SparkQA commented Sep 29, 2019

Test build #111556 has finished for PR 25962 at commit ced8539.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 29, 2019

Test build #111562 has finished for PR 25962 at commit 1fc204f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 30, 2019

Test build #111584 has finished for PR 25962 at commit 7b080fb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yaooqinn
Copy link
Member Author

cc @cloud-fan

@yaooqinn
Copy link
Member Author

cc @squito

val DEFAULT_DRIVER_MEM_MB = JavaUtils.DEFAULT_DRIVER_MEM_MB.toInt

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we use it before?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its used in Utils.createDirectory, which is only used in the standalone cluster

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private def canCreateFile(file: File): Boolean = {
try {
file.createNewFile()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it OK to leave the file created? This is different from before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good question. It looks safe to me, I don't think any of the writers check for file existence before opening a FileOutputStream etc. I also checked IndexBlockResolver, which has some checks on pre-existing files -- but those are checks on the final destination files, not the temporary intermediate files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is only one temp file be created or none, after or before this change

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you're trying to do here, but does this really buy you much? If you have one bad disk, even if you can prevent temporary files from going to that disk, the final destination files still have a really high chance of going to that disk, don't they?

val DEFAULT_DRIVER_MEM_MB = JavaUtils.DEFAULT_DRIVER_MEM_MB.toInt

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its used in Utils.createDirectory, which is only used in the standalone cluster


private def canCreateFile(file: File): Boolean = {
try {
file.createNewFile()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good question. It looks safe to me, I don't think any of the writers check for file existence before opening a FileOutputStream etc. I also checked IndexBlockResolver, which has some checks on pre-existing files -- but those are checks on the final destination files, not the temporary intermediate files.

@yaooqinn
Copy link
Member Author

I see what you're trying to do here, but does this really buy you much? If you have one bad disk, even if you can prevent temporary files from going to that disk, the final destination files still have a really high chance of going to that disk, don't they?

@squito Yes, as the temp file name is random, it still has a chance to go to that bad disk. But with 10 times max retries, the probability can go very low. And it is worth preventing one task from failure and rescheduling after it has done all the calculation process right before the commit process, especially when the task is heavy, skewed...

In our 2000 nodes Hadoop cluster, which with 12 disks/node, this approach reduce the number of that exception a lot.

@cloud-fan
Copy link
Contributor

@yaooqinn I think @squito was asking about the final file, which has a fixed name. How do you avoid putting the final file in bad disks?

@yaooqinn
Copy link
Member Author

yaooqinn commented Oct 29, 2019

@cloud-fan @squito sorry for the irrelevant answer. Yes, the final files are not handled here because they have fixed names for each taskAttemt(since 3.0.0). If we fail with a final file exception, we need to reschedule the task by driver.

@cloud-fan
Copy link
Contributor

This makes sense to me. Temp files are not only used by shuffle, but also external sorter, external hash map, etc. So avoiding putting temp files in bad disks does help. @squito what do you think?

@tgravescs
Copy link
Contributor

In our 2000 nodes Hadoop cluster, which with 12 disks/node, this approach reduce the number of that exception a lot.

So the only time Hadoop should show you this bad disk is if yarn doesn't detect it or it if goes bad during the running of the container. YARN has a specific feature to detect bad disks and will not give that to the container if they are bad. So in your case are you executors very long running? Are you using the yarn feature?
I'm not necessarily against this idea as disks can go bad while executors are running but just want to check to see how much this is really happening. What happens when we go to rename/merge the temp file to final location? the shuffle file name is static so should hash to same dir every time unless we are adding different dir. I can't remember that code off the top of my head. With the external shuffle service, the application registers what directories its using such that the external shuffle service can use those to find the files again, I'm wondering if the temp ones might work but then fail later on the static names.

@squito
Copy link
Contributor

squito commented Oct 29, 2019

OK, I can see how it can help a little to add this safety for temporary files. It doesn't really prevent task failure completely, but it'll help avoid it sometimes, anyway.

@tgravescs I don't think yarn's disk checker will help that much here -- as you've said, it only matters when starting a new container, and executors tend to survive past one disk failure. Blacklisting should still give you correctness even without the fix here -- but this is trying a simple heuristic which might work sometimes, even if it is not guaranteed to work.

@tgravescs
Copy link
Contributor

Yes it depends on how often your executors are created/destroyed, if using dynamic allocation and a lot of long tail it could be cycling those fairly often and yarn disk checker should help, if not it won't. Lots of jobs it won't help by itself.

I mostly wonder how much this helps because it might help with the temp shuffle file, but then it might fail later creating real shuffle file. Is this ok, maybe, but it's potentially changing from failing fast to failing later. if there is a long time between those then you potentially taking longer. There are a lot of mights in that sentence and I don't have any concrete idea how much it will help or hurt.
Has this actually been run on real jobs and have you seen a benefit?

@yaooqinn
Copy link
Member Author

So the only time Hadoop should show you this bad disk is if yarn doesn't detect it or it if goes bad during the running of the container. YARN has a specific feature to detect bad disks and will not give that to the container if they are bad. So in your case are you executors very long running? Are you using the yarn feature?

Yarn disk health check is on. Yes, it does help much for our whole 40k-50k Spark jobs daily. But it does not help much for those with long-lived (30min or longer) executors, which might be big ETL jobs.

What happens when we go to rename/merge the temp file to final location? the shuffle file name is static so should hash to same dir every time unless we are adding different dir.

When using multi disks, the temp file, and the final file more likely to pick different disks. If the final one picks the bad disk, the task attempt is doomed to failure. But if only the temp one picks, it is still worth saving it.

Yes it depends on how often your executors are created/destroyed, if using dynamic allocation and a lot of long tail it could be cycling those fairly often and yarn disk checker should help, if not it won't. Lots of jobs it won't help by itself.

Most of our spark jobs are dynamic allocation on. But executor recycling is not granular enough to handle disk failures, as the disk check of yarn is only just one periodic task.

Is this ok, maybe, but it's potentially changing from failing fast to failing later. if there is a long time between those then you potentially taking longer.

This can happen if and only if the temp and the final files pick the same disk. But comparing to reschedule an entire task, how long can success or fail to rename/move a file be?

Has this actually been run on real jobs and have you seen a benefit?

We have applied this to that cluster for more than 3 months. I have not performed very accurate statistics for such an exception. But before this, the users come to for help with this kind of failure once every 2 or 3 days on average. Since then, there is none.

@tgravescs
Copy link
Contributor

thanks for the details, sounds like anecdotally this has helped. I'm ok with the idea of the change.

rootDir0.setExecutable(false)
val tempShuffleFile2 = diskBlockManager.createTempShuffleBlock()._2
val tempLocalFile2 = diskBlockManager.createTempLocalBlock()._2
assert(tempShuffleFile2.exists(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that after 10 retries we still not able to find the healthy disk. Can we think of how to remove the flakiness of this test?

Copy link
Member Author

@yaooqinn yaooqinn Oct 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a precondition for this assert?like if tempShuffleFile2.parent belongs to rootDir1?

@SparkQA
Copy link

SparkQA commented Oct 31, 2019

Test build #112985 has finished for PR 25962 at commit dce7bb8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

rootDir0.setExecutable(false)
val tempShuffleFile2 = diskBlockManager.createTempShuffleBlock()._2
val tempLocalFile2 = diskBlockManager.createTempLocalBlock()._2
// It's possible that after 10 retries we still not able to find the healthy disk. we need to
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan please check whether this is ok or not

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@yaooqinn
Copy link
Member Author

@jiangxb1987 thanks a lot.

@yaooqinn yaooqinn requested a review from cloud-fan October 31, 2019 13:34
@yaooqinn
Copy link
Member Author

yaooqinn commented Nov 1, 2019

@jiangxb1987 Can we have this be merged. It looks be approved by all the reviewers here. Thank you very much.

@cloud-fan cloud-fan closed this in 8cf76f8 Nov 4, 2019
@cloud-fan
Copy link
Contributor

thanks, merging to master!

tempShuffleFile = getFile(blockId)
count += 1
}
(blockId, tempShuffleFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I should have mentioned this earlier -- these two functions are exactly the same except for the constructor, right? Could have been refactored. (not a big deal, and not worth a followup just for this.)

@yaooqinn yaooqinn deleted the SPARK-29285 branch November 4, 2019 15:15
@hvanhovell
Copy link
Contributor

@yaooqinn this is causing a performance regression for short running queries that only write to a few shuffle files, because now every shuffle file is created eagerly. In this case I think the cure is worse than the disease, so I think we should revert this. cc @cloud-fan @tgravescs @squito

@tgravescs
Copy link
Contributor

to clarify you are saying that many of your tasks don't actually create shuffle files so the fact they get created eagerly is causing overhead? Can you quantify how much?

@hvanhovell
Copy link
Contributor

@tgravescs yeah I am. The #28072 quantifies how much the overhead can be.

cloud-fan pushed a commit that referenced this pull request Mar 31, 2020
…ed by creating temporary file eagerly

### What changes were proposed in this pull request?
This reverts commit 8cf76f8. #25962

### Why are the changes needed?
In SPARK-29285, we change to create shuffle temporary eagerly. This is helpful for not to fail the entire task in the scenario of occasional disk failure. But for the applications that many tasks don't actually create shuffle files, it caused overhead. See the below benchmark:
Env: Spark local-cluster[2, 4, 19968], each queries run 5 round, each round 5 times.
Data: TPC-DS scale=99 generate by spark-tpcds-datagen
Results:
|     | Base                                                                                        | Revert                                                                                      |
|-----|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------|
| Q20 | Vector(4.096865667, 2.76231748, 2.722007606, 2.514433591, 2.400373579)  Median 2.722007606  | Vector(3.763185446, 2.586498463, 2.593472842, 2.320522846, 2.224627274)  Median 2.586498463 |
| Q33 | Vector(5.872176321, 4.854397586, 4.568787136, 4.393378146, 4.423996818)  Median 4.568787136 | Vector(5.38746785, 4.361236877, 4.082311276, 3.867206824, 3.783188024)  Median 4.082311276  |
| Q52 | Vector(3.978870321, 3.225437871, 3.282411608, 2.869674887, 2.644490664)  Median 3.225437871 | Vector(4.000381522, 3.196025108, 3.248787619, 2.767444508, 2.606163423)  Median 3.196025108 |
| Q56 | Vector(6.238045133, 4.820535173, 4.609965579, 4.313509894, 4.221256227)  Median 4.609965579 | Vector(6.241611339, 4.225592467, 4.195202502, 3.757085755, 3.657525982)  Median 4.195202502 |

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing tests.

Closes #28072 from xuanyuanking/SPARK-29285-revert.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Mar 31, 2020
…ed by creating temporary file eagerly

### What changes were proposed in this pull request?
This reverts commit 8cf76f8. #25962

### Why are the changes needed?
In SPARK-29285, we change to create shuffle temporary eagerly. This is helpful for not to fail the entire task in the scenario of occasional disk failure. But for the applications that many tasks don't actually create shuffle files, it caused overhead. See the below benchmark:
Env: Spark local-cluster[2, 4, 19968], each queries run 5 round, each round 5 times.
Data: TPC-DS scale=99 generate by spark-tpcds-datagen
Results:
|     | Base                                                                                        | Revert                                                                                      |
|-----|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------|
| Q20 | Vector(4.096865667, 2.76231748, 2.722007606, 2.514433591, 2.400373579)  Median 2.722007606  | Vector(3.763185446, 2.586498463, 2.593472842, 2.320522846, 2.224627274)  Median 2.586498463 |
| Q33 | Vector(5.872176321, 4.854397586, 4.568787136, 4.393378146, 4.423996818)  Median 4.568787136 | Vector(5.38746785, 4.361236877, 4.082311276, 3.867206824, 3.783188024)  Median 4.082311276  |
| Q52 | Vector(3.978870321, 3.225437871, 3.282411608, 2.869674887, 2.644490664)  Median 3.225437871 | Vector(4.000381522, 3.196025108, 3.248787619, 2.767444508, 2.606163423)  Median 3.196025108 |
| Q56 | Vector(6.238045133, 4.820535173, 4.609965579, 4.313509894, 4.221256227)  Median 4.609965579 | Vector(6.241611339, 4.225592467, 4.195202502, 3.757085755, 3.657525982)  Median 4.195202502 |

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing tests.

Closes #28072 from xuanyuanking/SPARK-29285-revert.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 07c5078)
Signed-off-by: Wenchen Fan <[email protected]>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…ed by creating temporary file eagerly

### What changes were proposed in this pull request?
This reverts commit 8cf76f8. apache#25962

### Why are the changes needed?
In SPARK-29285, we change to create shuffle temporary eagerly. This is helpful for not to fail the entire task in the scenario of occasional disk failure. But for the applications that many tasks don't actually create shuffle files, it caused overhead. See the below benchmark:
Env: Spark local-cluster[2, 4, 19968], each queries run 5 round, each round 5 times.
Data: TPC-DS scale=99 generate by spark-tpcds-datagen
Results:
|     | Base                                                                                        | Revert                                                                                      |
|-----|---------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------|
| Q20 | Vector(4.096865667, 2.76231748, 2.722007606, 2.514433591, 2.400373579)  Median 2.722007606  | Vector(3.763185446, 2.586498463, 2.593472842, 2.320522846, 2.224627274)  Median 2.586498463 |
| Q33 | Vector(5.872176321, 4.854397586, 4.568787136, 4.393378146, 4.423996818)  Median 4.568787136 | Vector(5.38746785, 4.361236877, 4.082311276, 3.867206824, 3.783188024)  Median 4.082311276  |
| Q52 | Vector(3.978870321, 3.225437871, 3.282411608, 2.869674887, 2.644490664)  Median 3.225437871 | Vector(4.000381522, 3.196025108, 3.248787619, 2.767444508, 2.606163423)  Median 3.196025108 |
| Q56 | Vector(6.238045133, 4.820535173, 4.609965579, 4.313509894, 4.221256227)  Median 4.609965579 | Vector(6.241611339, 4.225592467, 4.195202502, 3.757085755, 3.657525982)  Median 4.195202502 |

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing tests.

Closes apache#28072 from xuanyuanking/SPARK-29285-revert.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants