Skip to content

Conversation

@liupc
Copy link

@liupc liupc commented Jan 22, 2019

What changes were proposed in this pull request?

We encoutered an application failure in our production cluster which caused by a single broken disk. It will cause application failure.

Job aborted due to stage failure: Task serialization failed: java.io.IOException: Failed to create local dir in /home/work/hdd5/yarn/c3prc-hadoop/nodemanager/usercache/h_user_profile/appcache/application_1463372393999_144979/blockmgr-1f96b724-3e16-4c09-8601-1a2e3b758185/3b.
org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
org.apache.spark.storage.DiskStore.contains(DiskStore.scala:173)
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:391)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:801)
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:629)
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:987)
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:99)
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1332)
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:863)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1090)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14$$anonfun$apply$1.apply(DAGScheduler.scala:1086)
scala.Option.foreach(Option.scala:236)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1086)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$14.apply(DAGScheduler.scala:1085)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1085)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1528)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1493)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1482)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

We have configured yarn.nodemanager.local-dirs with multiple directories which are mounted on multiple disk, however, it still failed due to single disk broken. I think it's because spark does not handle disk broken in DiskBlockManager currently, even though there are always multi disk directories configured in a production environment.

Though, we can probably bypass this error by enlarge settings like spark.yarn.maxAppAttemps, but other apps may also suffer the same problem repeatedly which is annoying and unstable.... Also, for some large task, the retry will bring some downside, one major downside is that it will cause the delay of application completion(the extra recomputing time when retry for ShuffleMapTask and some extra schedule delay).

Actually, we can handle this case by simply adding a few of codes, for localDirs already mounted on multiple disk for a production environment in most cases.

This PR will support blacklisting bad disk directory and switch to a good directory for writing. thus can improve the robustness.

How was this patch tested?

UT

Please review http://spark.apache.org/contributing.html before opening a pull request.

@liupc liupc force-pushed the Handle-bad-disk-in-DiskBlockManager branch from f10fbb9 to b6a51e8 Compare January 23, 2019 06:42
@liupc liupc changed the title [SPARK-26689]Handle bad disk in DiskBlockManager [SPARK-26689]Support bad disk directory blacklist and retry in DiskBlockManager Jan 23, 2019
@liupc liupc changed the title [SPARK-26689]Support bad disk directory blacklist and retry in DiskBlockManager [SPARK-26689]Support blacklisting bad disk directory and retry in DiskBlockManager Jan 23, 2019
@liupc liupc force-pushed the Handle-bad-disk-in-DiskBlockManager branch from 6ea3b97 to c8cdde5 Compare January 28, 2019 05:30
@liupc
Copy link
Author

liupc commented Jan 28, 2019

cc @srowen @vanzin @HyukjinKwon @squito @dongjoon-hyun
Any body have a look at this PR and give some suggestions?

@liupc liupc changed the title [SPARK-26689]Support blacklisting bad disk directory and retry in DiskBlockManager [SPARK-26689][CORE]Support blacklisting bad disk directory and retry in DiskBlockManager Jan 28, 2019
@squito
Copy link
Contributor

squito commented Jan 28, 2019

Did you have blacklisting turned on when you ran into your failure? That seems like a more robust solution.

@liupc
Copy link
Author

liupc commented Jan 29, 2019

@squito Yes, it's turned on. I think the exception I attached is not the business of application level blacklist(it only handles task failure), this is a driver side exception, even before TaskSetManager created.
So this PR just introduced disk directories blacklist to handle this case which can avoid broadcast failure and shuffle retry when encountered disk error. This benifits the applications and make them more stable, and everything works as usual even before SREs fix this disk.

@squito
Copy link
Contributor

squito commented Jan 29, 2019

oh I see, this is handling a bad disk on the driver, not the executors. A bad disk on the executors should be handled by blacklisting.

In general, spark's fault tolerance for the driver is extremely poor, its a single point of failure for lots of reasons. Still, this may be a small fix to improve things somewhat, without any real guarantees.

There are a lot of cases here to think through carefully. Eg. as noted in the comments, this has to be kept in sync w/ ExternalShuffleBlockResolver#getFile. Even if you put in the same logic, its still possible they'll end up with different views of the actual bad directories. Also probably want to update getAllFiles() as well to respect the badDirs. And what happens when a dir with lots of data in it is suddenly marked as bad, that all other internal state is updated reasonably (maybe not immediately, but when it is updated that it makes sense).

I'm just thinking aloud about this change ... still on the fence of whether its good or not

var mostRecentFailure: Exception = null
// Update blacklist
val now = System.currentTimeMillis()
badDirs.dropWhile(now > dirToBlacklistExpiryTime(_))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method needs to be threadsafe, so badDirs and dirToBlacklistExpiryTime have to be protected or handle that somehow.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito yea, It should be threadsafe, but is there any case that the getFile method would be called concurrently?
I think it's only called by the DiskStore and IndexShuffleBlockResolver, seems they use this method in a sync way, is it right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see, we may have this method called concurrently when running multi-cores applications, is that right?

Liupengcheng added 2 commits January 30, 2019 19:46
@Ngone51
Copy link
Member

Ngone51 commented Jan 30, 2019

Could YarnAllocatorBlacklistTracker handle this issue ? @squito @attilapiros

@attilapiros
Copy link
Contributor

@Ngone51 unfortunately not as this is at the driver side (as mentioned @squito above)

@Ngone51
Copy link
Member

Ngone51 commented Jan 30, 2019

@attilapiros Oh, I see.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest a bit different test as currently the test's chance of flakiness is quite high.

What about testing all the cases here both when rootDir0 is good and rootDir1 is bad and vice versa?

With the introduction of Clock and using a ManualClock in the test you can do that based on the expiry feature (so even without creating a new DiskBlockManager which without expiry would be needed as badDirs cannot be cleaned).

test("test blacklisting bad disk directory") {
val blockId = new TestBlockId("1")
val hash = Utils.nonNegativeHash(blockId.name)
val (badDiskDir, goodDiskDir) = if (hash % 2 == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the purpose of this randomisation for choosing goodDiskDir/badDiskDir from rootDirs is to test from run to run the case when not the good dir is chosen first by diskBlockManager.

I would strongly suggest to avoid this as sometimes the feature is not tested and the introduced bug could be detected at the testing of an unrelated different change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@attilapiros good suggestion, I have already update the UT and also fix a bug: when getFile for block writen before disk corruption, it will return a different result.

@tgravescs
Copy link
Contributor

one of the comments above says this is for the Driver, but this is going to affect both executors and driver. I'm a bit on the fence about the change as well but a this point think if spark can be more robust we should do it. Note the configs would need to be documented and I agree with @squito would need to look closer at all the cases. This is in getFile but doesn't handle after you initially get the file but still an improvement. I think we should also make sure to log enough info so someone debugging can easily see things were blacklisted, perhaps print # of disks left after blacklist

@liupc
Copy link
Author

liupc commented Feb 10, 2019

@attilapiros @squito any more suggestions?

@squito
Copy link
Contributor

squito commented Feb 12, 2019

I'm sorry I haven't been able to look at this closely, but I do want to express my reservations about this. I'm worried about what'll happen with the external shuffle service when something appears to go wrong in the middle of an application, especially if its a temporary issue.

I also want to make sure the memory use of the migratedFile structure doesn't grow without bound. the number of files in there could be huge. I guess its OK if it only grows after a failure, as you're trading a failure right now from the DiskStore for an OOM later on.

anyway I'm not blocking this, just want to make sure this is thought through carefully before its merged.

@liupc
Copy link
Author

liupc commented Feb 14, 2019

@squito I know your concern, and I think you are right, there might be some risk about the inconsistent behavior with the external shuffle service, I need to think carefully about how to solve the problem.
Maybe, should also add the same logic for external shuffle service.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@srowen srowen closed this Oct 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants