diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index a76283e33fa65..c22b554805e46 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -225,9 +225,8 @@ case class TaskKilled(reason: String) extends TaskFailedReason { case class TaskCommitDenied( jobID: Int, partitionID: Int, - attemptNumber: Int) extends TaskFailedReason { - override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" + - s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber" + attemptNumber: Int) extends TaskKilled(reason = s"TaskCommitDenied (Driver denied task commit)" + + s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber") { /** * If a task failed because its attempt to commit was denied, do not count this failure * towards failing the stage. This is intended to prevent spurious stage failures in cases diff --git a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala index 326e042419774..d4c23aec9bd09 100644 --- a/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala +++ b/core/src/main/scala/org/apache/spark/executor/CommitDeniedException.scala @@ -17,7 +17,7 @@ package org.apache.spark.executor -import org.apache.spark.{TaskCommitDenied, TaskFailedReason} +import org.apache.spark.{TaskKilled, TaskCommitDenied} /** * Exception thrown when a task attempts to commit output to HDFS but is denied by the driver. @@ -29,5 +29,5 @@ private[spark] class CommitDeniedException( attemptNumber: Int) extends Exception(msg) { - def toTaskFailedReason: TaskFailedReason = TaskCommitDenied(jobID, splitID, attemptNumber) + def toTaskKilled: TaskKilled = TaskCommitDenied(jobID, splitID, attemptNumber) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5b396687dd11a..0acf3bfe1a031 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -457,9 +457,9 @@ private[spark] class Executor( taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason))) case CausedBy(cDE: CommitDeniedException) => - val reason = cDE.toTaskFailedReason + val reason = cDE.toTaskKilled setTaskFinishedAndClearInterruptStatus() - execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) + execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(reason)) case t: Throwable => // Attempt to exit cleanly by informing the driver of our failure.