Skip to content

[SPARK-20713][Spark Core] Convert CommitDenied to TaskKilled.#18070

Closed
lycplus wants to merge 5 commits intoapache:masterfrom
lycplus:SPARK-20713
Closed

[SPARK-20713][Spark Core] Convert CommitDenied to TaskKilled.#18070
lycplus wants to merge 5 commits intoapache:masterfrom
lycplus:SPARK-20713

Conversation

@lycplus
Copy link
Contributor

@lycplus lycplus commented May 23, 2017

What changes were proposed in this pull request?

In executor, CommitDeniedException is converted to TaskKilledException to avoid the inconsistency of taskState because there exists a race between when the driver kills and when the executor tries to commit.

How was this patch tested?

No tests because it is straightforward.

@lycplus lycplus changed the title Convert CommitDenied to TaskKilled. [SPARK-20713][Spark Core] Convert CommitDenied to TaskKilled. May 23, 2017
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@rxin
Copy link
Contributor

rxin commented May 23, 2017

cc @ericl

res
} catch {
case _: CommitDeniedException =>
throw new TaskKilledException("commit denied")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to just convert it here. I was originally thinking we would fix it up on the driver side because it knows that it explicitly a speculative task and it killed it. Here we don't know that for sure.

for instance you might have got a commit denied because the stage was aborted due to fetch failure. That shouldn't show up as killed.

Copy link
Contributor Author

@lycplus lycplus May 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should throw a more specific exception for the already committed case ? Executor can know about the already committed case before it sends the statusUpdate, so we do not need to wait until the driver's statusUpdate handles the commitDenied case.

As to because it knows that it explicitly a speculative task and it killed it, this may not be true. Consider the case that the statusUpdate of the committedDenied task comes earlier than that of the successful task, then the driver do know nothing, and it has to discriminate between already committed and other committedDenied case from the statusUpdate of committedDenied alone. This case is possible when:

  1. successful task attempt 1 commit
  2. attempt 2 commit failed
  3. attempt 2's statusUpdate arrives at driver
  4. attempt 1's statusUpdate arrives at driver

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't a stage abort also cause tasks to show up as killed (due to "stage cancelled"?)

taskScheduler.cancelTasks(stageId, shouldInterruptThread)

It seems to me that CommitDenied always implies the task is killed, in which case it might be fine to convert all CommitDeniedExceptions into TaskKilled.

Btw, there's a catch block below -- case CausedBy(cDE: CommitDeniedException) => which seems like the right place to be doing this handling.

@tgravescs
Copy link
Contributor

sorry the case I was talking about is with a fetch failure. The true abort stage doesn't happen until it retries 4 times. in that mean time you can have tasks from the same stage (different attempts) running at the same time because we currently don't kill the tasks from the aborted stage. Although thinking about that more having them show up as killed doesn't hurt anything just making a bit bigger assumption.

@@ -459,7 +459,7 @@ private[spark] class Executor(
case CausedBy(cDE: CommitDeniedException) =>
val reason = cDE.toTaskFailedReason
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably change this to be toTaskCommitDeniedReason since its not failed anymore.

@lycplus
Copy link
Contributor Author

lycplus commented Jun 1, 2017

ping @tgravescs

@tgravescs
Copy link
Contributor

thanks for the udpates. I was testing this out by running large job with speculative tasks and I am still seeing the stage summary show failed tasks. It looks like its due to this code:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala#L396

Where it doesn't differentiate the commit denied message so i think we need to handle it there so the stats show up properly.

It would also be good to add a unit test for that where you can look at the stageData to make sure the numFailedTasks is what you expect.

@lycplus
Copy link
Contributor Author

lycplus commented Jun 2, 2017

How about Letting TaskCommitDenied and TaskKilled extend a same trait (for example, TaskKilledReason)? (ignore the last commit for now, it seems bad for TaskCommitDenied extending TaskKilled directly) This way when accounting metrics, TaskCommitDenied and TaskKilled are all contributing to taskKilled and not TaskFailed.

@tgravescs
Copy link
Contributor

sorry for my delay on getting back to this.
So if we do that you would have to have taskKilledReason extend TaskFailedReason so because things rely on the countTowardsTaskFailures field. Then we would want to go through everywhere those are used and see if we need to update the other code. It doesn't look like there would be to many places to update so I think this approach sounds good. If you find something that its painful to update then just updating JobProgressListener in a few place would be fine also.

@jiangxb1987
Copy link
Contributor

ping @liyichao Will you address the latest comments from @tgravescs ?

@lycplus
Copy link
Contributor Author

lycplus commented Aug 3, 2017

I will update the pr by updating everywhere taskKilledReason used in a day, sorry for the delay.

@tgravescs
Copy link
Contributor

there is actually another pull request up that does this same thing:
#18819

@lycplus
Copy link
Contributor Author

lycplus commented Aug 3, 2017

Oh, I did not notice that, since @nlyu follows up, I will close this pr now.

@lycplus lycplus closed this Aug 3, 2017
ghost pushed a commit to dbtsai/spark that referenced this pull request Aug 3, 2017
## What changes were proposed in this pull request?

In executor, toTaskFailedReason is converted to toTaskCommitDeniedReason to avoid the inconsistency of taskState. In JobProgressListener, add case TaskCommitDenied so that now the stage killed number is been incremented other than failed number.
This pull request is picked up from: apache#18070 using commit: ff93ade
The case match for TaskCommitDenied is added incrementing the correct num of killed after pull/18070.

## How was this patch tested?

Run a normal speculative job and check the Stage UI page, should have no failed displayed.

Author: louis lyu <llyu@c02tk24rg8wl-lm.champ.corp.yahoo.com>

Closes apache#18819 from nlyu/SPARK-20713.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants

Comments