-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19276][CORE] Fetch Failure handling robust to user error handling #16639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Fault-tolerance in spark requires special handling of shuffle fetch failures. The Executor would catch FetchFailedException and send a special msg back to the driver. However, intervening user code could intercept that exception, and wrap it with something else. This even happens in SparkSQL. So rather than checking the exception directly, we'll store the fetch failure directly in the TaskContext, where users can't touch it. This includes a test case which failed before the fix.
|
Test build #71636 has finished for PR 16639 at commit
|
|
cc @kayousterhout @markhamstra @mateiz This isn't just protecting against crazy user code -- I've seen users hit this with spark sql (because of spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala Line 214 in 278fa1e
I attempted to write a larger integration test, which reproduced the issue in a "local-cluster" setup, but got stuck. ShuffleBlockFetcherIterator does some fetches on construction, before its used as an iterator wrapped in user code. So if the failures happen during that initialization, everything was fine before. The failure has to happen inside the call to |
|
Test build #71637 has finished for PR 16639 at commit
|
|
Test build #71638 has finished for PR 16639 at commit
|
|
Test build #71640 has finished for PR 16639 at commit
|
mridulm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch @squito, fixing this should definitely make spark more robust.
| // which intercepts this exception (possibly wrapping it), the Executor can still tell there was | ||
| // a fetch failure, and send the correct error msg back to the driver. The TaskContext won't be | ||
| // defined if this is run on the driver (just in test cases) -- we can safely ignore then. | ||
| Option(TaskContext.get()).map(_.setFetchFailed(this)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since creation of an Exception does not necessarily mean it should get thrown - we must explicitly add this expectation to the documentation/contract of FetchFailedException constructor - indicating that we expect it to be created only for it to be thrown immediately.
This should be fine since FetchFailedException is private[spark] right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, good point. I added to the docs, does it look OK?
I also considered making the call to TaskContext.setFetchFailed live outside of the constructor, so at each site it was created, it would have to be called -- but I thought that seemed more dangerous.
| // Whether the task has failed. | ||
| @volatile private var failed: Boolean = false | ||
|
|
||
| var fetchFailed: Option[FetchFailedException] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@volatile private ?
| // and threw something else. Regardless, we treat it as a fetch failure. | ||
| val reason = task.context.fetchFailed.get.toTaskFailedReason | ||
| setTaskFinishedAndClearInterruptStatus() | ||
| execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Probably log a similar message as above ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean the msg I added about "TID ${taskId} completed successfully though internally it encountered unrecoverable fetch failures!"? I wouldn't think we'd want to log anything special here. I'm trying to make this a "normal" code path. The user is allowed to allowed to do this. (sparksql already does.)
we could log a warning, but then this change should be accompanied by auditing the code and making sure we never do this ourselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, something along those lines ...
And I agree, we should not be doing this ourselves as well.
| setTaskFinishedAndClearInterruptStatus() | ||
| execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) | ||
|
|
||
| case t: Throwable if task.context.fetchFailed.isDefined => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task and task.context can be null in case exception is thrown before/while deserializing task or before task is run (or initialization of context in task.run fails).
In any of these cases, the if condition here will result in NPE, and needs to be fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, great point! sorry I missed that. I've also added a test case for this as well.
|
thanks for the feedback @mridulm , all good points. I pushed an update to address some of the points, also have some follow up discussion |
|
Test build #71669 has finished for PR 16639 at commit
|
|
Test build #71668 has finished for PR 16639 at commit
|
|
Test build #71673 has finished for PR 16639 at commit
|
| } | ||
|
|
||
| private[spark] override def setFetchFailed(fetchFailed: FetchFailedException): Unit = { | ||
| this._fetchFailed = Some(fetchFailed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Option(fetchFailed)
|
|
||
| val serTask = serializer.serialize(task) | ||
| val taskDescription = fakeTaskDescription(serTask) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: too many empty lines
| executor.launchTask(mockBackend, taskDescription) | ||
| val startTime = System.currentTimeMillis() | ||
| val maxTime = startTime + 5000 | ||
| while (executor.numRunningTasks > 0 && System.currentTimeMillis() < maxTime) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use eventually here, or at least System.nanoTime instead.
kayousterhout
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a super quick review of this and have some high level thoughts (will do a more thorough review pending your thoughts on the things below):
(1) Instead of this approach, did you consider walking through the exceptions (with getCause()) to see if there's a nested FetchFailure in there? That seems simpler, with the con of missing scenarios where the user discards the initial exception entirely. Not sure how likely that is? The current approach is definitely more defensive towards bad user code, but I'm hesitant about the amount of added complexity.
(2) For testing, does it help to pass in a much smaller maxBytesToFetch (spark.reduce.maxSizeInFlight) to ShuffleBlockFetcherIterator to limit the size of the initial fetches, to make it easier to wrap the FetchFailed when you want to?
|
|
||
| // If there was a fetch failure in the task, we store it here, to make sure user-code doesn't | ||
| // hide the exception. See SPARK-19276 | ||
| @volatile private var _fetchFailed: Option[FetchFailedException] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: can you more verbosely call this _fetchFailedException, so it's more obvious that it's not a boolean variable (like the above failed variable)
| setTaskFinishedAndClearInterruptStatus() | ||
| execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) | ||
|
|
||
| case t: Throwable if hasFetchFailure => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the above case be eliminated with the addition of this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, as this is now, you could eliminate this -- I left it separate for now just to highlight that we can differentiate two special cases, which we could handle in a few different ways.
- FetchFailed is thrown, and the task fails, but its not the outer-most exception
It seems clear in this case, we should fail the task with a FetchFailure. But do we also want to log an error or something indicating bad user code? Kinda minor, but might be a good idea. (Suggested by @mridulm above as well, I think.)
1a) or the FetchFailed isn't part of the thrown exception at all.
As I mentioned in my response to your other question, I'd like to consider this exactly the same as (1).
- FetchFailed is thrown, but totally swallowed so the task succeeds
Should we succeed the task, or fail it? I don't really know how this would happen. It seems really unlikely the user meant to do this. But then again, maybe the user did? I chose to just log an error but still succeed the task. (@markhamstra commented about this on the jira as well.)
its pretty easy to change the code for whatever the desired behavior is, just waiting for a clear decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Mridul's comment on (1) (that it would be nice to log a warning in this case) and your assessment of 2. To handle (1), you could have just this one case, and then log a warning if !t.isInstanceOf[FetchFailedException]
| memoryManager.synchronized { memoryManager.notifyAll() } | ||
| } | ||
| } finally { | ||
| // though we unset the ThreadLocal here, the context itself is still queried directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "the context member variable" instead of just "the context" (took me a min to parse this)
yeah, I considered it but chose this for exactly the reason you suggest, to be more defensive. It just seems way too easy for a user to get this wrong. Even assuming the user always wraps the exception, what about bad error handling in a try {
...
} catch {
case t: Throwable => throw new MySpecialAppException(t)
} finally {
someResource.close() // oops, this can throw an exception
}Given the special importance of this exception, it really seems like we should be handling it in some way that bad user code can't cover it up.
Do you mean for writing a larger integration test? that would probably help make the failure more likely, but I off the top my head, I don't think that will be enough to reliable trigger the real failure (which means if its ever broken, we'll just think its a flaky test, not broken functionality). IIRC the problem isn't just having more fetches than fit in the initial requests -- you also need the fetch failure to occur at a specific point in the processing. I'd need to futz around with it a while again to say for sure, though. I'd certainly like an integration test, but eventually decided the unit test I added was sufficient. |
|
Walking up a getCause tree is not reliable - finally is one of the cases where it will fail (others being catch block's ignoring it, catch-rethrow idioms resulting in other exceptions being thrown, etc). |
|
Ok I'm convinced re: not walking up the cause tree. I didn't think about that finally case. I'll do another review now. Re: larger integration test, I didn't have a particular thing in mind -- I was mentioning that in response to your comment at it was hard to trigger the failure because of that first set of blocks that's fetched before any next() calls (so I was hoping you had some integration test in mind that would leverage that). But fine not to have one if it's still too complex to do. |
kayousterhout
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach here looks good. I was hoping there might be a way to simplify but I couldn't think of anything (and the commenting is helpful for noting why stuff needed to be added)
| // unlikely). So we will log an error and keep going. | ||
| logError(s"TID ${taskId} completed successfully though internally it encountered " + | ||
| s"unrecoverable fetch failures! Most likely this means user code is incorrectly " + | ||
| s"swallowing Spark's internal exceptions", fetchFailure) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you be explicit about what exception is getting swallowed here? (i.e., "incorrectly swallowing Spark's internal FetchFailedException") -- to possibly simplify debugging/fixing this issue for a user who runs into it.
| setTaskFinishedAndClearInterruptStatus() | ||
| execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) | ||
|
|
||
| case t: Throwable if hasFetchFailure => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Mridul's comment on (1) (that it would be nice to log a warning in this case) and your assessment of 2. To handle (1), you could have just this one case, and then log a warning if !t.isInstanceOf[FetchFailedException]
| case t: Throwable if hasFetchFailure => | ||
| // tbere was a fetch failure in the task, but some user code wrapped that exception | ||
| // and threw something else. Regardless, we treat it as a fetch failure. | ||
| val reason = task.context.fetchFailed.get.toTaskFailedReason |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny nit: but does it make sense to store the taskFailedReason (rather than the actual exception) in the task context?
| } | ||
| } | ||
|
|
||
| test("SPARK-19276: Handle Fetch Failed for all intervening user code") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about "Handle FetchFailedExceptions that are hidden by user exceptions"?
| val taskDescription = fakeTaskDescription(serTask) | ||
|
|
||
|
|
||
| val failReason = runTaskAndGetFailReason(taskDescription) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment about what's going on here? I think the FFE gets thrown because the shuffle map data was never generated? And then you're checking that it's correctly accounted for, even though the user RDD code wrapped the exception in something else?
| assert(failReason.isInstanceOf[FetchFailed]) | ||
| } | ||
|
|
||
| test("Gracefully handle error in task deserialization") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this test related to this PR? (seems useful but like it should be in its own PR?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm pointed out this bug in an earlier version of this pr, so I fixed the bug and added a test case. But in any case, I've separated this out into #16930 / https://issues.apache.org/jira/browse/SPARK-19597
| } | ||
| } | ||
|
|
||
| class FakeShuffleRDD(sc: SparkContext) extends RDD[Int](sc, Nil) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
about about FetchFailureThrowingShuffleRDD? (to make it obvious what the point of this is?)
| } | ||
| } | ||
|
|
||
| private def mockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this and the below method have a verb in the name, since they're doing something rather than just getters? createMockEnv?
| logWarning(s"TID ${taskId} encountered a ${classOf[FetchFailedException]} and " + | ||
| s"failed, but did not directly throw the ${classOf[FetchFailedException]}. " + | ||
| s"Spark is still handling the fetch failure, but these exceptions should not be " + | ||
| s"intercepted by user code.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm @kayousterhout how is this msg? open to other suggestions. I'm not sure exactly what to recommend to the user instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry that this is slightly misleading because there's not necessarily anything bad happening here (e.g., in the SQL case), and the user-thrown exception is getting permanently lost. What about something more like
logWarning(s"TID ${taskId} encountered a ${classOf[FetchFailedException]} and " +
s"failed, but the ${classOf[FetchFailedException]} was hidden by another " +
s"exception: $t. Spark is handling this like a fetch failure and ignoring the " +
s"other exception.")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kayousterhout While I like the message, spark sql should not be catching that exception to begin with anyway.
Btw, the impact of ignoring the exception here is needs to be also considered ... "catch Throwable" block does some interesting things for accumulator updates, isFatalError.
Atleast the latter must be handled here (an OOM being raised for example) - not sure about accumulator updates ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @mridulm that it looks like these lines (473-475 below) need to be added here:
if (Utils.isFatalError(t)) {
SparkUncaughtExceptionHandler.uncaughtException(t)
}
I'm less sure about the accumulator updates. It looks like the old code doesn't report accumulators for fetch failed exceptions, but it's not clear to me why we'd report them for some kinds of exceptions but not others. The simplest thing to do seems to be the current approach (since it roughly maintains the old behavior of not updating accumulators for fetch failures) but I don't have a good sense for why this is or is not correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I like that msg better. I changed it slightly so the original exception is at the end, otherwise its hard to tell where the original exception ends and you are back to the error msg. Here's what the new msg looks like from the test case now:
17/02/27 16:33:43.953 Executor task launch worker for task 0 WARN Executor: TID 0 encountered a org.apache.spark.shuffle.FetchFailedException and failed, but the org.apache.spark.shuffle.FetchFailedException was hidden by another exception. Spark is handling this like a fetch failure and ignoring the other exception: java.lang.RuntimeException: User Exception that hides the original exception
You have a good point about the uncaught exception handler, I have added that back. I wondered whether I should add those lines inside the case t: Throwable if hasFetchFailure block, or make it a condition for the case itself case t: Throwable if hasFetchFailure && !Utils.isFatalError(t). I decided to make it part of the condition, since that is more like the old behavior, and a fetch failure that happens during an OOM may not be real.
I also looked into adding a unit test for this handling -- it requires some refactoring, potentially more work than its worth, so I put it in a separate commit.
I'd rather avoid changing the behavior for accumulators here. Accumulators have such weird semantics its not clear what they should do, we can fix that separately if we really want to.
|
Test build #72883 has finished for PR 16639 at commit
|
|
Test build #72882 has finished for PR 16639 at commit
|
|
Jenkins, retest this please |
|
Test build #72900 has finished for PR 16639 at commit
|
kayousterhout
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few last things. Also looks like this duplicates some of the functionality in #16930, which can be fixed when that's merged
| logWarning(s"TID ${taskId} encountered a ${classOf[FetchFailedException]} and " + | ||
| s"failed, but did not directly throw the ${classOf[FetchFailedException]}. " + | ||
| s"Spark is still handling the fetch failure, but these exceptions should not be " + | ||
| s"intercepted by user code.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I worry that this is slightly misleading because there's not necessarily anything bad happening here (e.g., in the SQL case), and the user-thrown exception is getting permanently lost. What about something more like
logWarning(s"TID ${taskId} encountered a ${classOf[FetchFailedException]} and " +
s"failed, but the ${classOf[FetchFailedException]} was hidden by another " +
s"exception: $t. Spark is handling this like a fetch failure and ignoring the " +
s"other exception.")
| // SPARK-19276. We set the fetch failure in the task context, so that even if there is user-code | ||
| // which intercepts this exception (possibly wrapping it), the Executor can still tell there was | ||
| // a fetch failure, and send the correct error msg back to the driver. The TaskContext won't be | ||
| // defined if this is run on the driver (just in test cases) -- we can safely ignore then. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This last sentence is confusing. A task that runs locally on the driver can still hit fetch failures right? Or are you saying the TaskContext will only be not defined in test cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I've reworded this. The issue is that we have test cases where the TaskContext isn't defined, and so we'd hit an NPE without the Option wrapper. But in general, the TaskContext should always be defined anytime we'd create a FetchFailure.
The alternative would be to track down the test cases w/out a TaskContext, and add one back.
|
Test build #73598 has finished for PR 16639 at commit
|
|
Jenkins retest this please (filed https://issues.apache.org/jira/browse/SPARK-19772) |
|
Test build #73597 has finished for PR 16639 at commit
|
kayousterhout
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM -- thanks for adding the test for fatal errors
|
Test build #73607 has finished for PR 16639 at commit
|
|
@mridulm look ok to you too? I plan on merging soon. I just made a small change to the comments (I copied and pasted incorrect comments in the last test case I added) |
|
@squito It looks good to me, thanks for the changes ! |
|
Test build #73784 has finished for PR 16639 at commit
|
|
I merged this into master. Thanks @squito! |
Fault-tolerance in spark requires special handling of shuffle fetch failures. The Executor would catch FetchFailedException and send a special msg back to the driver. However, intervening user code could intercept that exception, and wrap it with something else. This even happens in SparkSQL. So rather than checking the thrown exception only, we'll store the fetch failure directly in the TaskContext, where users can't touch it. Added a test case which failed before the fix. Full test suite via jenkins. Author: Imran Rashid <[email protected]> Closes apache#16639 from squito/SPARK-19276. Conflicts: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala Conflicts: project/MimaExcludes.scala
What changes were proposed in this pull request?
Fault-tolerance in spark requires special handling of shuffle fetch
failures. The Executor would catch FetchFailedException and send a
special msg back to the driver.
However, intervening user code could intercept that exception, and wrap
it with something else. This even happens in SparkSQL. So rather than
checking the thrown exception only, we'll store the fetch failure directly
in the TaskContext, where users can't touch it.
How was this patch tested?
Added a test case which failed before the fix. Full test suite via jenkins.