Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
975a2a3
enhance blacklist mechanism
wei-mao-intel Dec 29, 2015
51d3c88
change import order to meet new scala style check rule
wei-mao-intel Jan 13, 2016
7e52311
simplify code and fix typo
wei-mao-intel Feb 23, 2016
b600604
style
squito May 10, 2016
45525a1
small refactoring
squito May 10, 2016
a6e94d7
basic test framework for entire spark scheduler
squito May 10, 2016
20fb3e9
TaskResultGetter now expects there to always be non-null accum updates
squito May 10, 2016
0ca9815
switch to making backend run in another thread
squito May 13, 2016
421c2a1
remove MultiExecutorBackend for now
squito May 13, 2016
c091187
remove uncertain comment about messageScheduler
squito May 17, 2016
3b67b2a
cleanup
squito May 17, 2016
79bc384
add BlacklistIntegrationSuite and corresponding refactoring
squito May 18, 2016
8349b76
cleanup
squito May 18, 2016
7050b49
comments
squito May 18, 2016
f400741
Merge branch 'SPARK-10372-scheduler-integs' into blacklist_w_performance
squito May 18, 2016
0095376
move dummy killTask to MockBackend, otherwise occasional problems eve…
squito May 18, 2016
cb5860f
move dummy killTask to MockBackend, otherwise occasional problems eve…
squito May 18, 2016
8034995
take advantage of ExternalClusteManager extension
squito May 18, 2016
360c7cd
cleanup
squito May 18, 2016
08b28c6
Merge branch 'SPARK-10372-scheduler-integs' into blacklist_w_performance
squito May 18, 2016
c7a78b0
performance updates to mock backend + some utils
squito May 19, 2016
ee59913
add performance tests
squito May 19, 2016
22705dc
Merge branch 'master' into blacklist_w_performance
squito May 19, 2016
72d87ce
Merge branch 'scheduler_performance_tests' into blacklist_w_performance
squito May 19, 2016
4fcbc1d
bug fix in mock scheduler
squito May 19, 2016
6ed19ae
style
squito May 20, 2016
67acce9
simplification and comments
squito May 20, 2016
0530a94
Merge branch 'SPARK-10372-scheduler-integs' into scheduler_performanc…
squito May 20, 2016
17fcc9e
fix merge
squito May 20, 2016
b12b563
comments
squito May 20, 2016
930dbf7
Merge branch 'scheduler_performance_tests' into blacklist_w_performance
squito May 20, 2016
f6bb6de
Merge branch 'master' into blacklist-SPARK-8426
squito May 20, 2016
1de56d1
Merge branch 'blacklist-SPARK-8426' into blacklist_w_performance
squito May 20, 2016
5d547f4
more tests
squito May 20, 2016
d46c65d
smaller demo of performance difference
squito May 20, 2016
a394ab7
labels
squito May 23, 2016
f4609da
wip -- some instrumentation, easier repro of slowdown
squito May 23, 2016
e852e0c
notes mostly
squito May 23, 2016
8b78d3f
more notes
squito May 24, 2016
883bfd7
fix race condition w/ runningTaskSets
squito May 24, 2016
4358b2f
updated logging
squito May 24, 2016
f850a30
log executor in addition to host
squito May 24, 2016
4ac99c6
wip, logging and some logic updates
squito May 25, 2016
6f02ded
performance suite updates
squito May 25, 2016
71f1b47
optimization -- skip blacklisted executors earlier in scheduling loop
squito May 26, 2016
ffd0f25
bug fix -- update the right cache in nodeBlacklistForStage
squito May 26, 2016
3effef6
cleanup, TODOs
squito May 26, 2016
456f578
process tasks in LIFO order for all performance tests, more cases, etc.
squito May 26, 2016
794c804
Merge branch 'master' into blacklist-SPARK-8426
squito May 26, 2016
1f57963
Merge branch 'blacklist_w_performance' into blacklist-SPARK-8426
squito May 26, 2016
8f2534b
remove performance suite from this branch
squito May 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _blacklistTracker: Option[BlacklistTracker] = _

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -327,6 +328,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

private[spark] def cleaner: Option[ContextCleaner] = _cleaner

private[spark] def blacklistTracker: Option[BlacklistTracker] = _blacklistTracker

private[spark] var checkpointDir: Option[String] = None

// Thread Local variable that can be used by users to pass information down the stack
Expand Down Expand Up @@ -534,6 +537,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
_executorAllocationManager.foreach(_.start())

// By default blacklistTracker is enabled.
_blacklistTracker = if (_conf.getBoolean("spark.scheduler.blacklist.enabled", true)) {
Some(new BlacklistTracker(_conf))
} else {
None
}
_blacklistTracker.foreach(_.start())

_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
Expand Down Expand Up @@ -1766,6 +1777,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
Utils.tryLogNonFatalError {
_executorAllocationManager.foreach(_.stop())
}
Utils.tryLogNonFatalError {
_blacklistTracker.foreach(_.stop())
}

if (_listenerBusStarted) {
Utils.tryLogNonFatalError {
listenerBus.stop()
Expand Down
161 changes: 161 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.util.Clock

/**
* The interface to determine executor blacklist and node blacklist.
*
* TODO notes on thread-safety
*/
private[scheduler] trait BlacklistStrategy {
/** Define a time interval to expire failure information of executors */
val expireTimeInMilliseconds: Long

/** Return executors in blacklist which are related to given stage and partition */
def getExecutorBlacklist(
executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
atomTask: StageAndPartition,
clock: Clock): Set[String]

/** Return all nodes in blacklist */
def getNodeBlacklist(
executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
clock: Clock): Set[String]

/**
* Return all nodes in blacklist for specified stage. By default it returns the same result as
* getNodeBlacklist. It could be override in strategy implementation.
*/
def getNodeBlacklistForStage(
executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
stageId: Int,
clock: Clock): Set[String] = getNodeBlacklist(executorIdToFailureStatus, clock)

/**
* Choose which executors should be removed from blacklist. Return true if any executors are
* removed from the blacklist, false otherwise. The default implementation removes executors from
* the blacklist after [[expireTimeInMilliseconds]]
*/
def expireExecutorsInBlackList(
executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
clock: Clock): Boolean = {

val now = clock.getTimeMillis()
val expiredKey = executorIdToFailureStatus.filter {
case (executorid, failureStatus) =>
(now - failureStatus.updatedTime) >= expireTimeInMilliseconds
}.keySet

if (expiredKey.isEmpty) {
false
} else {
executorIdToFailureStatus --= expiredKey
true
}
}
}

/**
* This strategy is applied to keep the same semantics as standard behavior before spark 1.6.
*
* If an executor failed running "task A", then we think this executor is blacked for "task A",
* but at the same time. it is still healthy for other task. Node blacklist is always empty.
*/
private[scheduler] class SingleTaskStrategy(
val expireTimeInMilliseconds: Long) extends BlacklistStrategy {
var executorBlacklistCallCount = 0L
def getExecutorBlacklist(
executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
atomTask: StageAndPartition,
clock: Clock): Set[String] = {
executorBlacklistCallCount += 1
executorIdToFailureStatus.filter{
case (_, failureStatus) => failureStatus.numFailuresPerTask.keySet.contains(atomTask) &&
clock.getTimeMillis() - failureStatus.updatedTime < expireTimeInMilliseconds
}.keys.toSet
}

def getNodeBlacklist(
executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
clock: Clock): Set[String] = Set.empty[String]
}

/**
* Comparing to SingleTaskStrategy, it supports node blacklist. With this strategy, once more than
* one executor failed running for specific stage, we put all executors on the same node into
* blacklist. So all tasks from the same stage will not be allocated to that node.
*/
private[scheduler] class AdvancedSingleTaskStrategy(
expireTimeInMilliseconds: Long) extends SingleTaskStrategy(expireTimeInMilliseconds) {

var nodeBlacklistCallCount = 0L
override def getNodeBlacklistForStage(
executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
stageId: Int,
clock: Clock): Set[String] = {
nodeBlacklistCallCount += 1
// when there is one bad node (or executor), this is really slow. We pile up a ton of
// task failures, and we've got to iterate through failure data for each task. Furthermore,
// since we don't actively blacklist the bad node / executor, we just keep assigning it more
// tasks that fail. And after each failure, we invalidate our cache, which means we need
// to call this again.
// This can be particularly painful when the failures are fast, since its likely the only
// executor with free slots is the one which just failed some tasks, which just keep going ...
val nodes = executorIdToFailureStatus.filter{
case (_, failureStatus) =>
failureStatus.numFailuresPerTask.keySet.map(_.stageId).contains(stageId) &&
clock.getTimeMillis() - failureStatus.updatedTime < expireTimeInMilliseconds
}.values.map(_.host)
getDuplicateElem(nodes, 1)
}

override def getNodeBlacklist(
executorIdToFailureStatus: mutable.HashMap[String, FailureStatus],
clock: Clock): Set[String] = {
// resolve a nodes sequence from failure status.
val nodes = executorIdToFailureStatus.values.map(_.host)
getDuplicateElem(nodes, 1)
}

// A help function to find hosts which have more than "depTimes" executors on it in blacklist
private def getDuplicateElem(ndoes: Iterable[String], dupTimes: Int): Set[String] = {
ndoes.groupBy(identity).mapValues(_.size) // resolve map (nodeName => occurred times)
.filter(ele => ele._2 > dupTimes) // return nodes which occurred more than dupTimes.
.keys.toSet
}
}

/**
* Create BlacklistStrategy instance according to SparkConf
*/
private[scheduler] object BlacklistStrategy {
def apply(sparkConf: SparkConf): BlacklistStrategy = {
val timeout = sparkConf.getTimeAsMs("spark.scheduler.blacklist.timeout",
sparkConf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L).toString() + "ms")

sparkConf.getBoolean("spark.scheduler.blacklist.advancedStrategy", false) match {
case false => new SingleTaskStrategy(timeout)
case true => new AdvancedSingleTaskStrategy(timeout)
}
}
}
Loading