Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ private[spark] class TaskSchedulerImpl(
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= 1
availableCpus(i) -= taskSet.CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
}
}
Expand Down
82 changes: 63 additions & 19 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ private[spark] class TaskSetManager(
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

/*
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
* does not realize right away leading to repeated task failures. If enabled,
* this temporarily prevents a task from re-launching on an executor where
* it just failed.
*/
private[this] val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
conf.getLong("spark.task.executorBlacklistTimeout", 0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to rename this to spark.scheduler.executorBlacklistTimeout or just spark.scheduler.blacklistTimeout

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I'm not sure if it needs to be private[this] instead of just private; we don't usually use private[this]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was @markhamstra 's recommendation - private[this] as better than private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding timeout variable name : I was considering a very specific variable name to allow for a better/future approach to handling this issue - and at that time allow us to retire this variable without potential variable name conflicts (spark.scheduler.blacklistTimeout implies a more general black-list handling, which this is not unfortunately); IMO, this is a stop gap solution until we add support for a better black list approach which handles both executors and blocks.

But until we have that, this will atleast unblock us - thankfully, this is not something which a lot of users are hitting (but is fairly common in our case unfortunately).

Given this, should we expose this in documentation ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding private[this], we don't really use it elsewhere, so I'd skip it here. It would be a bigger change to convert the whole codebase to private[this]. Its main benefit seems to be in terms of code generated but that kind of stuff usually gets inlined anyway.

I now see that this is per-task, not a blacklist for the whole executor, so it makes sense for the variable name to contain "task". In that case maybe call this spark.scheduler.executorTaskBlacklistTime -- I think it's good to put "scheduler" as the first component there since it's not really a property of tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting the entire codebase to private[this] might not be an unreasonable thing to do. Only in very rare instances does private[this] not work where plain private does (it wouldn't surprise me if every instance of private in Spark could safely be made private[this]), and it can provide some performance benefits (not everything that you would expect to get inlined away actually does), so arguably private[this] should be the default instead of private.

If someone has a few spare cycles, doing the search and replace and running the spark-perf numbers could at least give us some idea of how much we are giving away by using the slightly simpler syntax. Digging out just when private[this] makes a difference would be more work, and maybe not worth doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case that seems like something for a separate change, and for now @mridulm should just keep it consistent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's entirely reasonable.

And fwiw, I gave the naive replace-all-private-with-private[this] a shot, and it is more complicated than I anticipated. I may stick with this for a while and may end up with that separate PR -- if for no other reason than that it is turning out to be a somewhat interesting way to probe Spark's internals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. removed private[this].
  2. renamed property to 'spark.scheduler.executorTaskBlacklistTime'


// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
Expand All @@ -71,7 +80,9 @@ private[spark] class TaskSetManager(
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
val successful = new Array[Boolean](numTasks)
val numFailures = new Array[Int](numTasks)
private val numFailures = new Array[Int](numTasks)
// key is taskId, value is a Map of executor id to when it failed
private[this] val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small thing, this might be better as a HashMap[(Int, String), Long] instead of a HashMap of HashMaps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a task is successful, we need to remove all blacklisted executors we know of for that task - moving to key == (Int, String) will mean we will need to iterate over all keys in failedExecutors which can be expensive.

val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksSuccessful = 0

Expand Down Expand Up @@ -228,12 +239,18 @@ private[spark] class TaskSetManager(
* This method also cleans up any tasks in the list that have already
* been launched, since we want that to happen lazily.
*/
private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
while (!list.isEmpty) {
val index = list.last
list.trimEnd(1)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
var indexOffset = list.size

while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
if (!executorIsBlacklisted(execId, index)) {
// This should almost always be list.trimEnd(1) to remove tail
list.remove(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
}
}
None
Expand All @@ -244,6 +261,21 @@ private[spark] class TaskSetManager(
taskAttempts(taskIndex).exists(_.host == host)
}

/**
* Is this re-execution of a failed task on an executor it already failed in before
* EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
*/
private[this] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
if (failedExecutors.contains(taskId)) {
val failed = failedExecutors.get(taskId).get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just write this as failedExectors(taskId)


return failed.contains(execId) &&
clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
}

false
}

/**
* Return a speculative task for a given executor if any are available. The task should not have
* an attempt running on this host, in case the host is slow. In addition, the task should meet
Expand All @@ -254,10 +286,13 @@ private[spark] class TaskSetManager(
{
speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set

def canRunOnHost(index: Int): Boolean =
!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)

if (!speculatableTasks.isEmpty) {
// Check for process-local or preference-less tasks; note that tasks can be process-local
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_.executorId)
if (prefs.size == 0 || executors.contains(execId)) {
Expand All @@ -268,7 +303,7 @@ private[spark] class TaskSetManager(

// Check for node-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations.map(_.host)
if (locations.contains(host)) {
speculatableTasks -= index
Expand All @@ -280,7 +315,7 @@ private[spark] class TaskSetManager(
// Check for rack-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for (rack <- sched.getRackForHost(host)) {
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost)
if (racks.contains(rack)) {
speculatableTasks -= index
Expand All @@ -292,7 +327,7 @@ private[spark] class TaskSetManager(

// Check for non-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
speculatableTasks -= index
return Some((index, TaskLocality.ANY))
}
Expand All @@ -309,32 +344,32 @@ private[spark] class TaskSetManager(
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}

if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(getPendingTasksForHost(host))) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}

if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(getPendingTasksForRack(rack))
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}

// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}

if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(allPendingTasks)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
}
}
Expand Down Expand Up @@ -460,6 +495,7 @@ private[spark] class TaskSetManager(
logInfo("Ignorning task-finished event for TID " + tid + " because task " +
index + " has already completed successfully")
}
failedExecutors.remove(index)
maybeFinishTaskSet()
}

Expand All @@ -480,17 +516,19 @@ private[spark] class TaskSetManager(
logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
}
var taskMetrics : TaskMetrics = null
var failureReason = "unknown"
var failureReason: String = null
reason match {
case fetchFailed: FetchFailed =>
logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
if (!successful(index)) {
successful(index) = true
tasksSuccessful += 1
}
// Not adding to failed executors for FetchFailed.
isZombie = true

case TaskKilled =>
// Not adding to failed executors for TaskKilled.
logWarning("Task %d was killed.".format(tid))

case ef: ExceptionFailure =>
Expand All @@ -504,7 +542,8 @@ private[spark] class TaskSetManager(
return
}
val key = ef.description
failureReason = "Exception failure: %s".format(ef.description)
failureReason = "Exception failure in TID %s on host %s: %s".format(
tid, info.host, ef.description)
val now = clock.getTime()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
Expand Down Expand Up @@ -533,11 +572,16 @@ private[spark] class TaskSetManager(
failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
logWarning(failureReason)

case _ => {}
case _ =>
failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to line 519 instead (or if you prefer, remove the default setting on line 519, which now won't ever be used)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will make default null.

}
// always add to failed executors
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
put(info.executorId, clock.getTime())
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
addPendingTask(index)
if (!isZombie && state != TaskState.KILLED) {
assert (null != failureReason)
numFailures(index) += 1
if (numFailures(index) >= maxTaskFailures) {
logError("Task %s:%d failed %d times; aborting job".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,93 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
}
}

test("executors should be blacklisted after task failure, in spite of locality preferences") {
val rescheduleDelay = 300L
val conf = new SparkConf().
set("spark.task.executorBlacklistTimeout", rescheduleDelay.toString).
// dont wait to jump locality levels in this test
set("spark.locality.wait", "0")

sc = new SparkContext("local", "test", conf)
// two executors on same host, one on different.
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
("exec1.1", "host1"), ("exec2", "host2"))
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, 4, clock)

{
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")

assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1")

// Cause exec1 to fail : failure 1
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
assert(!sched.taskSetsFailed.contains(taskSet.id))

// Ensure scheduling on exec1 fails after failure 1 due to blacklist
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty)
}

// Run the task on exec1.1 - should work, and then fail it on exec1.1
{
val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL)
assert(offerResult.isDefined,
"Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)

assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1.1")

// Cause exec1.1 to fail : failure 2
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
assert(!sched.taskSetsFailed.contains(taskSet.id))

// Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
}

// Run the task on exec2 - should work, and then fail it on exec2
{
val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY)
assert(offerResult.isDefined, "Expect resource offer to return a task")

assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec2")

// Cause exec2 to fail : failure 3
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
assert(!sched.taskSetsFailed.contains(taskSet.id))

// Ensure scheduling on exec2 fails after failure 3 due to blacklist
assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty)
}

// After reschedule delay, scheduling on exec1 should be possible.
clock.advance(rescheduleDelay)

{
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")

assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1")

assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)

// Cause exec1 to fail : failure 4
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
}

// we have failed the same task 4 times now : task id should now be in taskSetsFailed
assert(sched.taskSetsFailed.contains(taskSet.id))
}

def createTaskResult(id: Int): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
Expand Down