Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a6e94d7
basic test framework for entire spark scheduler
squito May 10, 2016
20fb3e9
TaskResultGetter now expects there to always be non-null accum updates
squito May 10, 2016
0ca9815
switch to making backend run in another thread
squito May 13, 2016
421c2a1
remove MultiExecutorBackend for now
squito May 13, 2016
c091187
remove uncertain comment about messageScheduler
squito May 17, 2016
3b67b2a
cleanup
squito May 17, 2016
79bc384
add BlacklistIntegrationSuite and corresponding refactoring
squito May 18, 2016
8349b76
cleanup
squito May 18, 2016
7050b49
comments
squito May 18, 2016
f400741
Merge branch 'SPARK-10372-scheduler-integs' into blacklist_w_performance
squito May 18, 2016
0095376
move dummy killTask to MockBackend, otherwise occasional problems eve…
squito May 18, 2016
cb5860f
move dummy killTask to MockBackend, otherwise occasional problems eve…
squito May 18, 2016
8034995
take advantage of ExternalClusteManager extension
squito May 18, 2016
360c7cd
cleanup
squito May 18, 2016
08b28c6
Merge branch 'SPARK-10372-scheduler-integs' into blacklist_w_performance
squito May 18, 2016
c7a78b0
performance updates to mock backend + some utils
squito May 19, 2016
ee59913
add performance tests
squito May 19, 2016
22705dc
Merge branch 'master' into blacklist_w_performance
squito May 19, 2016
72d87ce
Merge branch 'scheduler_performance_tests' into blacklist_w_performance
squito May 19, 2016
4fcbc1d
bug fix in mock scheduler
squito May 19, 2016
6ed19ae
style
squito May 20, 2016
67acce9
simplification and comments
squito May 20, 2016
0530a94
Merge branch 'SPARK-10372-scheduler-integs' into scheduler_performanc…
squito May 20, 2016
17fcc9e
fix merge
squito May 20, 2016
b12b563
comments
squito May 20, 2016
930dbf7
Merge branch 'scheduler_performance_tests' into blacklist_w_performance
squito May 20, 2016
1de56d1
Merge branch 'blacklist-SPARK-8426' into blacklist_w_performance
squito May 20, 2016
5d547f4
more tests
squito May 20, 2016
d46c65d
smaller demo of performance difference
squito May 20, 2016
a394ab7
labels
squito May 23, 2016
f4609da
wip -- some instrumentation, easier repro of slowdown
squito May 23, 2016
e852e0c
notes mostly
squito May 23, 2016
8b78d3f
more notes
squito May 24, 2016
883bfd7
fix race condition w/ runningTaskSets
squito May 24, 2016
4358b2f
updated logging
squito May 24, 2016
f850a30
log executor in addition to host
squito May 24, 2016
4ac99c6
wip, logging and some logic updates
squito May 25, 2016
6f02ded
performance suite updates
squito May 25, 2016
71f1b47
optimization -- skip blacklisted executors earlier in scheduling loop
squito May 26, 2016
ffd0f25
bug fix -- update the right cache in nodeBlacklistForStage
squito May 26, 2016
3effef6
cleanup, TODOs
squito May 26, 2016
456f578
process tasks in LIFO order for all performance tests, more cases, etc.
squito May 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.util.Clock

/**
* The interface to determine executor blacklist and node blacklist.
*
* TODO notes on thread-safety
*/
private[scheduler] trait BlacklistStrategy {
/** Define a time interval to expire failure information of executors */
Expand Down Expand Up @@ -81,10 +83,12 @@ private[scheduler] trait BlacklistStrategy {
*/
private[scheduler] class SingleTaskStrategy(
val expireTimeInMilliseconds: Long) extends BlacklistStrategy {
var executorBlacklistCallCount = 0L
def getExecutorBlacklist(
executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
atomTask: StageAndPartition,
clock: Clock): Set[String] = {
executorBlacklistCallCount += 1
executorIdToFailureStatus.filter{
case (_, failureStatus) => failureStatus.numFailuresPerTask.keySet.contains(atomTask) &&
clock.getTimeMillis() - failureStatus.updatedTime < expireTimeInMilliseconds
Expand All @@ -104,10 +108,19 @@ private[scheduler] class SingleTaskStrategy(
private[scheduler] class AdvancedSingleTaskStrategy(
expireTimeInMilliseconds: Long) extends SingleTaskStrategy(expireTimeInMilliseconds) {

var nodeBlacklistCallCount = 0L
override def getNodeBlacklistForStage(
executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
stageId: Int,
clock: Clock): Set[String] = {
nodeBlacklistCallCount += 1
// when there is one bad node (or executor), this is really slow. We pile up a ton of
// task failures, and we've got to iterate through failure data for each task. Furthermore,
// since we don't actively blacklist the bad node / executor, we just keep assigning it more
// tasks that fail. And after each failure, we invalidate our cache, which means we need
// to call this again.
// This can be particularly painful when the failures are fast, since its likely the only
// executor with free slots is the one which just failed some tasks, which just keep going ...
val nodes = executorIdToFailureStatus.filter{
case (_, failureStatus) =>
failureStatus.numFailuresPerTask.keySet.map(_.stageId).contains(stageId) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.util.Clock
import org.apache.spark.util.SystemClock
import org.apache.spark.util.ThreadUtils
Expand All @@ -37,7 +38,7 @@ import org.apache.spark.util.Utils
*/
private[spark] class BlacklistTracker(
sparkConf: SparkConf,
clock: Clock = new SystemClock()) extends BlacklistCache{
clock: Clock = new SystemClock()) extends BlacklistCache with Logging {

// maintain a ExecutorId --> FailureStatus HashMap
private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap()
Expand All @@ -64,24 +65,39 @@ private[spark] class BlacklistTracker(
def stop(): Unit = {
scheduler.shutdown()
scheduler.awaitTermination(10, TimeUnit.SECONDS)
logInfo(s"Executor Blacklist callcount =" +
s" ${strategy.asInstanceOf[SingleTaskStrategy].executorBlacklistCallCount}")
strategy match {
case as: AdvancedSingleTaskStrategy =>
logInfo(s"Node Blacklist callcount =" +
s" ${as.nodeBlacklistCallCount}")
case _ => // no op
}
}

// The actual implementation is delegated to strategy
private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized {
val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock)
logInfo(s"Checked for expired blacklist: ${updated}")
if (updated) {
invalidateCache()
}
}

// The actual implementation is delegated to strategy
def executorBlacklist(
sched: TaskSchedulerImpl, stageId: Int, partition: Int): Set[String] = synchronized {
sched: TaskSchedulerImpl,
stageId: Int,
partition: Int): Set[String] = synchronized {
// note that this is NOT only called from the dag scheduler event loop
val atomTask = StageAndPartition(stageId, partition)
if (!isBlacklistExecutorCacheValid) {
reEvaluateExecutorBlacklistAndUpdateCache(sched, atomTask, clock)
} else {
// getExecutorBlacklistFromCache(atomTask).getOrElse(Set.empty[String])
getExecutorBlacklistFromCache(atomTask).getOrElse {
// TODO Why is this necessary? (its because we clear the entire map on an invalidate,
// and lazily rebuild it)
reEvaluateExecutorBlacklistAndUpdateCache(sched, atomTask, clock)
}
}
Expand All @@ -100,6 +116,10 @@ private[spark] class BlacklistTracker(

// The actual implementation is delegated to strategy
def nodeBlacklistForStage(stageId: Int): Set[String] = synchronized {
// TODO here and elsewhere -- we invalidate the cache way too often. In general, we should
// be able to do an in-place update of the caches. (a) this is slow and (b) it makes
// it really hard to track when the blacklist actually changes (would be *really* nice to
// log a msg about node level blacklisting at least)
if (isBlacklistNodeForStageCacheValid) {
getNodeBlacklistForStageFromCache(stageId).getOrElse(
reEvaluateNodeBlacklistForStageAndUpdateCache(stageId))
Expand Down Expand Up @@ -176,14 +196,15 @@ private[spark] class BlacklistTracker(
private def executorsOnBlacklistedNode(
sched: TaskSchedulerImpl,
atomTask: StageAndPartition): Set[String] = {
nodeBlacklistForStage(atomTask.stageId).flatMap(sched.getExecutorsAliveOnHost(_)
.getOrElse(Set.empty[String])).toSet
nodeBlacklistForStage(atomTask.stageId).flatMap(sched.getExecutorsAliveOnHost(_)
.getOrElse(Set.empty[String]))
}

private def reEvaluateExecutorBlacklistAndUpdateCache(
sched: TaskSchedulerImpl,
atomTask: StageAndPartition,
clock: Clock): Set[String] = {
// TODO some kind of logging when the blacklist is *updated*
val executors = executorsOnBlacklistedNode(sched, atomTask) ++
strategy.getExecutorBlacklist(executorIdToFailureStatus, atomTask, clock)
updateBlacklistExecutorCache(atomTask, executors)
Expand All @@ -192,16 +213,16 @@ private[spark] class BlacklistTracker(

private def reEvaluateNodeBlacklistForStageAndUpdateCache(stageId: Int): Set[String] = {
val nodes = strategy.getNodeBlacklistForStage(executorIdToFailureStatus, stageId, clock)
updateBlacklistNodeCache(nodes)
updateBlacklistNodeForStageCache(stageId, nodes)
// updateBlacklistNodeCache(nodes)
nodes
}
}

/**
* Hide cache details in this trait to make code clean and avoid operation mistake
*/
private[scheduler] trait BlacklistCache {

private[scheduler] trait BlacklistCache extends Logging {
// local cache to minimize the the work when query blacklisted executor and node
private val blacklistExecutorCache = mutable.HashMap.empty[StageAndPartition, Set[String]]
private val blacklistNodeCache = mutable.Set.empty[String]
Expand Down Expand Up @@ -241,6 +262,11 @@ private[scheduler] trait BlacklistCache {
protected def updateBlacklistNodeForStageCache(
stageId: Int,
blacklistNode: Set[String]): Unit = cacheLock.synchronized {
// TODO this needs to actually get called, and add unit test
val wasBlacklisted = blacklistNodeForStageCache.getOrElse(stageId, Set.empty[String])
if (wasBlacklisted != blacklistNode) {
logInfo(s"Updating node blacklist for Stage ${stageId} to ${blacklistNode}")
}
if (!_isBlacklistNodeForStageCacheValid) {
blacklistNodeForStageCache.clear()
}
Expand All @@ -249,6 +275,7 @@ private[scheduler] trait BlacklistCache {
}

protected def invalidateCache(): Unit = cacheLock.synchronized {
logInfo("invalidating blacklist cache")
_isBlacklistExecutorCacheValid = false
_isBlacklistNodeCacheValid = false
_isBlacklistNodeForStageCacheValid = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ class DAGScheduler(
stage.clearFailures()
} else {
stage.latestInfo.stageFailed(errorMessage.get)
logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}")
}

outputCommitCoordinator.stageEnd(stage.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[spark] class DirectTaskResult[T](

val numUpdates = in.readInt
if (numUpdates == 0) {
accumUpdates = null
accumUpdates = Seq()
} else {
val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]]
for (i <- 0 until numUpdates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,16 @@ private[spark] class TaskSchedulerImpl(
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
// TODO unit test, and also add executor-stage filtering as well
// This is an optimization -- the taskSet might contain a very long list of pending tasks.
// Rather than wasting time checking the offer against each task, and then realizing the
// executor is blacklisted, just filter out the bad executor immediately.
val nodeBlacklist = taskSet.blacklistTracker.map{_.nodeBlacklistForStage(taskSet.stageId)}
.getOrElse(Set())
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
if (!nodeBlacklist(host) && availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ private[spark] class TaskSetManager(
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
s" $taskLocality, ${serializedTask.limit} bytes)")
logInfo(s"Starting $taskName (TID $taskId, $host, exec ${info.executorId}, " +
s"partition ${task.partitionId},$taskLocality, ${serializedTask.limit} bytes)")

sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
Expand Down Expand Up @@ -603,8 +603,9 @@ private[spark] class TaskSetManager(
sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
if (!successful(index)) {
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s / exec %s (%d/%d)".format(
info.id, taskSet.id, info.taskId, info.duration, info.host, info.executorId,
tasksSuccessful, numTasks))
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
Expand Down Expand Up @@ -635,8 +636,8 @@ private[spark] class TaskSetManager(
val index = info.index
copiesRunning(index) -= 1
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " +
reason.asInstanceOf[TaskFailedReason].toErrorString
val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}," +
s" exec ${info.executorId}): ${reason.asInstanceOf[TaskFailedReason].toErrorString}"
val failureException: Option[Throwable] = reason match {
case fetchFailed: FetchFailed =>
logWarning(failureReason)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
org.apache.spark.scheduler.DummyExternalClusterManager
org.apache.spark.scheduler.DummyExternalClusterManager
org.apache.spark.scheduler.MockExternalClusterManager
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler

import scala.concurrent.Await
import scala.concurrent.duration._

import org.apache.spark._

class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{

val badHost = "host-0"

/**
* This backend just always fails if the task is executed on a bad host, but otherwise succeeds
* all tasks.
*/
def badHostBackend(): Unit = {
val task = backend.beginTask()
val host = backend.executorIdToExecutor(task.executorId).host
if (host == badHost) {
backend.taskFailed(task, new RuntimeException("I'm a bad host!"))
} else {
backend.taskSuccess(task, 42)
}
}

// Test demonstrating the issue -- without a config change, the scheduler keeps scheduling
// according to locality preferences, and so the job fails
testScheduler("If preferred node is bad, without blacklist job will fail") {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
}
assert(results.isEmpty)
assertDataStructuresEmpty(noFailure = false)
}

// even with the blacklist turned on, if maxTaskFailures is not more than the number
// of executors on the bad node, then locality preferences will lead to us cycling through
// the executors on the bad node, and still failing the job
testScheduler(
"With blacklist on, job will still fail if there are too many bad executors on bad host",
extraConfs = Seq(
// just set this to something much longer than the test duration
("spark.scheduler.executorTaskBlacklistTime", "10000000")
)
) {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(3, SECONDS)
Await.ready(jobFuture, duration)
}
assert(results.isEmpty)
assertDataStructuresEmpty(noFailure = false)
}

// Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually
// schedule on a good node and succeed the job
testScheduler(
"Bad node with multiple executors, job will still succeed with the right confs",
extraConfs = Seq(
// just set this to something much longer than the test duration
("spark.scheduler.executorTaskBlacklistTime", "10000000"),
// this has to be higher than the number of executors on the bad host
("spark.task.maxFailures", "5"),
// just to avoid this test taking too long
("spark.locality.wait", "10ms")
)
) {
val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
withBackend(badHostBackend _) {
val jobFuture = submit(rdd, (0 until 10).toArray)
val duration = Duration(1, SECONDS)
Await.ready(jobFuture, duration)
}
assert(results === (0 until 10).map { _ -> 42 }.toMap)
assertDataStructuresEmpty(noFailure = true)
}

}

class MultiExecutorMockBackend(
conf: SparkConf,
taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) {

val nHosts = conf.getInt("spark.testing.nHosts", 5)
val nExecutorsPerHost = conf.getInt("spark.testing.nExecutorsPerHost", 4)
val nCoresPerExecutor = conf.getInt("spark.testing.nCoresPerExecutor", 2)

override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = {
(0 until nHosts).flatMap { hostIdx =>
val hostName = "host-" + hostIdx
(0 until nExecutorsPerHost).map { subIdx =>
val executorId = (hostIdx * nExecutorsPerHost + subIdx).toString
executorId ->
ExecutorTaskStatus(host = hostName, executorId = executorId, nCoresPerExecutor)
}
}.toMap
}

override def defaultParallelism(): Int = nHosts * nExecutorsPerHost * nCoresPerExecutor
}

class MockRDDWithLocalityPrefs(
sc: SparkContext,
numPartitions: Int,
shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]],
val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps) {
override def getPreferredLocations(split: Partition): Seq[String] = {
Seq(preferredLoc)
}
}
Loading