Skip to content

Commit 1df2a91

Browse files
committed
Throwing exception if SparkHadoopWriter commit denied
However, the Executor will not treat this like any other ExceptionFailure, but rather send a specific TaskEndReason to the driver. The driver simply ignores the task that failed to commit after logging. If the task that attempted to commit first fails, it will be resubmitted. TODO unit tests, the current unit tests don't capture this workflow all the way down to the Executor level.
1 parent d431144 commit 1df2a91

File tree

8 files changed

+76
-11
lines changed

8 files changed

+76
-11
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
/**
23+
* :: DeveloperApi ::
24+
* Exception thrown when a task attempts to commit output to Hadoop, but
25+
* is denied by the driver.
26+
*/
27+
@DeveloperApi
28+
class CommitDeniedException(msg: String, jobID: Int, splitID: Int, attemptID: Int)
29+
extends Exception(msg) {
30+
def toTaskEndReason(): TaskEndReason = new TaskCommitDenied(jobID, splitID, attemptID)
31+
}

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,14 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
122122
}
123123
}
124124
} else {
125-
logInfo(s"$taID: Not committed because DAGScheduler did not authorize commit")
125+
val msg: String = s"$taID: Not committed because the driver did not authorize commit"
126+
logInfo(msg)
127+
throw new CommitDeniedException(msg, jobID, splitID, attemptID)
126128
}
127129
} else {
128-
logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}")
130+
val msg: String = s"No need to commit output of task because needsTaskCommit=false: ${taID.value}"
131+
logInfo(msg)
132+
throw new CommitDeniedException(msg, jobID, splitID, attemptID)
129133
}
130134
}
131135

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,20 @@ case object TaskKilled extends TaskFailedReason {
146146
override def toErrorString: String = "TaskKilled (killed intentionally)"
147147
}
148148

149+
/**
150+
* :: DeveloperApi ::
151+
* Task requested the driver to commit, but was denied.
152+
*/
153+
@DeveloperApi
154+
case class TaskCommitDenied(
155+
jobID: Int,
156+
splitID: Int,
157+
attemptID: Int)
158+
extends TaskFailedReason {
159+
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
160+
s" for job: $jobID, split: $splitID, attempt: $attemptID"
161+
}
162+
149163
/**
150164
* :: DeveloperApi ::
151165
* The task failed because the executor that it was running on was lost. This may happen because

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,11 @@ private[spark] class Executor(
248248
execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
249249
}
250250

251+
case cDE: CommitDeniedException => {
252+
val reason = cDE.toTaskEndReason
253+
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
254+
}
255+
251256
case t: Throwable => {
252257
// Attempt to exit cleanly by informing the driver of our failure.
253258
// If anything goes wrong (or this was a fatal exception), we will delegate to

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,9 @@ class DAGScheduler(
10821082
handleExecutorLost(bmAddress.executorId, fetchFailed = true, Some(task.epoch))
10831083
}
10841084

1085+
case TaskCommitDenied(jobID, splitID, attemptID) =>
1086+
// Do nothing here, left up to the TaskScheduler to decide how to handle denied commits
1087+
10851088
case ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics) =>
10861089
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures
10871090

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,15 @@
1717

1818
package org.apache.spark.scheduler
1919

20-
import java.util.concurrent.{ExecutorService, TimeUnit, Executors, ConcurrentHashMap}
20+
import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap}
2121

2222
import scala.collection.{Map => ScalaImmutableMap}
23-
import scala.collection.concurrent.{Map => ScalaConcurrentMap}
2423
import scala.collection.convert.decorateAsScala._
2524

2625
import akka.actor.{ActorRef, Actor}
2726

2827
import org.apache.spark.{SparkConf, Logging}
29-
import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
28+
import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive}
3029

3130
private[spark] sealed trait OutputCommitCoordinationMessage
3231

@@ -119,7 +118,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
119118
coordinatorActor = Some(actor)
120119
executorRequestHandlingThreadPool = {
121120
if (isDriver) {
122-
Some(Executors.newFixedThreadPool(4))
121+
Some(Utils.newDaemonFixedThreadPool(8, "OutputCommitCoordinator"))
123122
} else {
124123
None
125124
}
@@ -174,8 +173,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
174173
task: TaskId,
175174
partId: PartitionId,
176175
attempt: TaskAttemptId): Unit = {
177-
executorRequestHandlingThreadPool.foreach(_.submit(
178-
new AskCommitRunnable(requester, this, stage, task, partId, attempt)))
176+
executorRequestHandlingThreadPool match {
177+
case Some(threadPool) =>
178+
threadPool.submit(new AskCommitRunnable(requester, this, stage, task, partId, attempt))
179+
case None =>
180+
logWarning("Got a request to commit output, but the OutputCommitCoordinator was already" +
181+
" shut down. Request is being denied.")
182+
requester ! false
183+
}
184+
179185
}
180186

181187
private def handleTaskCompletion(

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,9 @@ private[spark] class TaskSetManager(
646646
s"${ef.className} (${ef.description}) [duplicate $dupCount]")
647647
}
648648

649+
case e: TaskCommitDenied =>
650+
logWarning(failureReason)
651+
649652
case e: TaskFailedReason => // TaskResultLost, TaskKilled, and others
650653
logWarning(failureReason)
651654

@@ -657,7 +660,7 @@ private[spark] class TaskSetManager(
657660
put(info.executorId, clock.getTime())
658661
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
659662
addPendingTask(index)
660-
if (!isZombie && state != TaskState.KILLED) {
663+
if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
661664
assert (null != failureReason)
662665
numFailures(index) += 1
663666
if (numFailures(index) >= maxTaskFailures) {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@ package org.apache.spark.scheduler
2020
import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
2121
import scala.language.reflectiveCalls
2222

23+
import org.mockito.Mockito.mock
2324
import org.scalatest.{BeforeAndAfter, FunSuiteLike}
2425
import org.scalatest.concurrent.Timeouts
2526
import org.scalatest.time.SpanSugar._
2627

27-
import org.mockito.Mockito.mock
28-
2928
import org.apache.spark._
3029
import org.apache.spark.rdd.RDD
3130
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode

0 commit comments

Comments
 (0)