Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,16 @@ private[spark] class ExecutorAllocationManager(
*/
private def addExecutors(maxNumExecutorsNeeded: Int, rpId: Int): Int = {
val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId)
// Increase the maxNumExecutors by adding the excluded executors so that manager can
// launch new executors to replace the excluded executors.
val exclude = executorMonitor.excludedExecutorCount
val maxOverheadExecutors = maxNumExecutors + exclude
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I don't agree with this, at least not how its defined. The user defined the maximum number of executors to use, this is getting more than that. I realize that some are excluded, but this also comes down to a resource utilization question as well. If I am in multi-tenant environment, I want to make sure 1 job doesn't take over the entire cluster. max is one way to do this. I think we would either need to redefine this, which isn't great for backwards compatibility and could result in unexpected behavior or we add another config that is around the excluded nodes. this would either just be an allow to go over or a allow to go over by X. The downside to this is default would be 0 or false so you would have to configure if you do set max and want to use this feature. But I don't see a lot of jobs setting max unless they are trying to be nice in multi-tenant so it seems ok as long as its in release notes, etc.

you will notice the other logic for unschedulableTaskSets does not increase this, just increases the number we ask for.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me. Add an extra conf would be a good choice.

Although, I'm rethinking this change. It only takes effect when users set the max explicitly and the cluster reaches the max.( By default, max is Int.MaxValue. So we won't reach the max normally.) However, we still want to replace those excluded executors even if the cluster doesn't reach the max. For example, max/2 may be enough for task scheduling. And TaskScheduler also thinks there're max/2 executors without realizing X executors actually excluded.

So I think what we actually need here is to forcibly replace excluded executors when dynamic allocation & exclusion (but not kill) are both enabled. And it should not be related to the max value.

// Do not request more executors if it would put our target over the upper bound
// this is doing a max check per ResourceProfile
if (oldNumExecutorsTarget >= maxNumExecutors) {
if (oldNumExecutorsTarget >= maxOverheadExecutors ) {
logDebug("Not adding executors because our current target total " +
s"is already ${oldNumExecutorsTarget} (limit $maxNumExecutors)")
s"is already ${oldNumExecutorsTarget} (limit $maxOverheadExecutors " +
s"${if (exclude > 0) s"($exclude exclude)" else ""})")
numExecutorsToAddPerResourceProfileId(rpId) = 1
return 0
}
Expand All @@ -518,7 +523,8 @@ private[spark] class ExecutorAllocationManager(
// Ensure that our target doesn't exceed what we need at the present moment:
numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
// Ensure that our target fits within configured bounds:
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
numExecutorsTarget = math.max(
math.min(numExecutorsTarget, maxOverheadExecutors), minNumExecutors)
val delta = numExecutorsTarget - oldNumExecutorsTarget
numExecutorsTargetPerResourceProfileId(rpId) = numExecutorsTarget

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ private[spark] class ExecutorMonitor(
private val shuffleTrackingEnabled = !conf.get(SHUFFLE_SERVICE_ENABLED) &&
conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)

// We only track the `excluded` status of executor tacker when the exclusion feature is enabled
// but killing the excluded executors is disabled.
private val exclusionButNotKillEnabled = conf.get(EXCLUDE_ON_FAILURE_ENABLED).getOrElse(false) &&
!conf.get(EXCLUDE_ON_FAILURE_KILL_ENABLED)

private val executors = new ConcurrentHashMap[String, Tracker]()
private val execResourceProfileCount = new ConcurrentHashMap[Int, Int]()

Expand Down Expand Up @@ -166,6 +171,7 @@ private[spark] class ExecutorMonitor(

def executorCount: Int = executors.size()

// executors that are available for running tasks. Excluded executors are already excluded.
def executorCountWithResourceProfile(id: Int): Int = {
execResourceProfileCount.getOrDefault(id, 0)
}
Expand All @@ -180,6 +186,14 @@ private[spark] class ExecutorMonitor(
}
}

def excludedExecutorCount: Int = {
if (exclusionButNotKillEnabled) {
executors.values().asScala.count(_.excluded)
} else {
0
}
}

def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => exec.pendingRemoval }

def pendingRemovalCountPerResourceProfileId(id: Int): Int = {
Expand Down Expand Up @@ -307,7 +321,8 @@ private[spark] class ExecutorMonitor(
val executorId = event.taskInfo.executorId
// Guard against a late arriving task start event (SPARK-26927).
if (client.isExecutorActive(executorId)) {
val exec = ensureExecutorIsTracked(executorId, UNKNOWN_RESOURCE_PROFILE_ID)
val exec = ensureExecutorIsTracked(
executorId, event.taskInfo.host, UNKNOWN_RESOURCE_PROFILE_ID)
exec.updateRunningTasks(1)
}
}
Expand Down Expand Up @@ -337,7 +352,8 @@ private[spark] class ExecutorMonitor(
}

override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
val exec = ensureExecutorIsTracked(event.executorId, event.executorInfo.resourceProfileId)
val exec = ensureExecutorIsTracked(
event.executorId, event.executorInfo.executorHost, event.executorInfo.resourceProfileId)
exec.updateRunningTasks(0)
logInfo(s"New executor ${event.executorId} has registered (new total is ${executors.size()})")
}
Expand All @@ -348,6 +364,11 @@ private[spark] class ExecutorMonitor(
execResourceProfileCount.remove(rpId, 0)
}

private def incrementExecResourceProfileCount(rpId: Int): Unit = {
val count = execResourceProfileCount.computeIfAbsent(rpId, _ => 0)
execResourceProfileCount.replace(rpId, count, count + 1)
}

override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
val removed = executors.remove(event.executorId)
if (removed != null) {
Expand All @@ -359,7 +380,9 @@ private[spark] class ExecutorMonitor(
}

override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
val exec = ensureExecutorIsTracked(
event.blockUpdatedInfo.blockManagerId.executorId,
event.blockUpdatedInfo.blockManagerId.host,
UNKNOWN_RESOURCE_PROFILE_ID)

// Check if it is a shuffle file, or RDD to pick the correct codepath for update
Expand Down Expand Up @@ -418,6 +441,42 @@ private[spark] class ExecutorMonitor(
}
}

override def onExecutorExcluded(executorExcluded: SparkListenerExecutorExcluded): Unit = {
if (exclusionButNotKillEnabled) {
Option(executors.get(executorExcluded.executorId)).foreach { exec =>
exec.excluded = true
decrementExecResourceProfileCount(exec.resourceProfileId)
}
}
}

override def onExecutorUnexcluded(executorUnexcluded: SparkListenerExecutorUnexcluded): Unit = {
if (exclusionButNotKillEnabled) {
Option(executors.get(executorUnexcluded.executorId)).foreach { exec =>
exec.excluded = false
incrementExecResourceProfileCount(exec.resourceProfileId)
}
}
}

override def onNodeExcluded(nodeExcluded: SparkListenerNodeExcluded): Unit = {
if (exclusionButNotKillEnabled) {
executors.values().asScala.filter(_.host == nodeExcluded.hostId).foreach { exec =>
exec.excluded = true
decrementExecResourceProfileCount(exec.resourceProfileId)
}
}
}

override def onNodeUnexcluded(nodeUnexcluded: SparkListenerNodeUnexcluded): Unit = {
if (exclusionButNotKillEnabled) {
executors.values().asScala.filter(_.host == nodeUnexcluded.hostId).foreach { exec =>
exec.excluded = false
incrementExecResourceProfileCount(exec.resourceProfileId)
}
}
}

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case ShuffleCleanedEvent(id) => cleanupShuffle(id)
case _ =>
Expand Down Expand Up @@ -467,23 +526,21 @@ private[spark] class ExecutorMonitor(
* which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded`
* event, which is possible because these events are posted in different threads. (see SPARK-4951)
*/
private def ensureExecutorIsTracked(id: String, resourceProfileId: Int): Tracker = {
val numExecsWithRpId = execResourceProfileCount.computeIfAbsent(resourceProfileId, _ => 0)
private def ensureExecutorIsTracked(id: String, host: String, resourceProfileId: Int): Tracker = {
val execTracker = executors.computeIfAbsent(id, _ => {
val newcount = numExecsWithRpId + 1
execResourceProfileCount.put(resourceProfileId, newcount)
logDebug(s"Executor added with ResourceProfile id: $resourceProfileId " +
s"count is now $newcount")
new Tracker(resourceProfileId)
})
incrementExecResourceProfileCount(resourceProfileId)
logDebug(s"Executor added with ResourceProfile id: $resourceProfileId " +
s"count is now ${execResourceProfileCount.get(resourceProfileId)}")
new Tracker(resourceProfileId, host)
})
// if we had added executor before without knowing the resource profile id, fix it up
if (execTracker.resourceProfileId == UNKNOWN_RESOURCE_PROFILE_ID &&
resourceProfileId != UNKNOWN_RESOURCE_PROFILE_ID) {
logDebug(s"Executor: $id, resource profile id was unknown, setting " +
s"it to $resourceProfileId")
execTracker.resourceProfileId = resourceProfileId
// fix up the counts for each resource profile id
execResourceProfileCount.put(resourceProfileId, numExecsWithRpId + 1)
incrementExecResourceProfileCount(resourceProfileId)
decrementExecResourceProfileCount(UNKNOWN_RESOURCE_PROFILE_ID)
}
execTracker
Expand All @@ -506,7 +563,7 @@ private[spark] class ExecutorMonitor(
}
}

private class Tracker(var resourceProfileId: Int) {
private class Tracker(var resourceProfileId: Int, val host: String) {
@volatile var timeoutAt: Long = Long.MaxValue

// Tracks whether this executor is thought to be timed out. It's used to detect when the list
Expand All @@ -516,6 +573,8 @@ private[spark] class ExecutorMonitor(
var pendingRemoval: Boolean = false
var decommissioning: Boolean = false
var hasActiveShuffle: Boolean = false
// whether the executor is temporarily excluded by the `HealthTracker`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should expand this to state excluded for entire application (I realize HealthTracker implies this but would like to be more explicit), does not include excluded within the stage level.

var excluded: Boolean = false

private var idleStart: Long = -1
private var runningTasks: Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1648,11 +1648,45 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
}

test("SPARK-33799: excluded executors should not be considered as available executors") {
val manager = createManager(createConf(1, 4, 1,
exclusionEnabled = true))
post(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
onExecutorAdded(manager, "executor-0", defaultProfile)

val updatesNeeded =
new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]

// Keep adding until the limit is reached
assert(numExecutorsTargetForDefaultProfileId(manager) === 1)
assert(numExecutorsToAddForDefaultProfile(manager) === 1)
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
onExecutorAdded(manager, "executor-1", defaultProfile)

assert(numExecutorsTargetForDefaultProfileId(manager) === 2)
assert(numExecutorsToAddForDefaultProfile(manager) === 2)
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis())
onExecutorAdded(manager, "executor-2", defaultProfile)
onExecutorAdded(manager, "executor-3", defaultProfile)

assert(numExecutorsTargetForDefaultProfileId(manager) === 4)
assert(numExecutorsToAddForDefaultProfile(manager) === 4)
// reach the maxExecutors, so no more executor will be added
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0)

// mark executor-0 as excluded, so manager would launch a new executor to replace it
post(SparkListenerExecutorExcluded(clock.getTimeMillis(), "executor-0", 1))
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
}

private def createConf(
minExecutors: Int = 1,
maxExecutors: Int = 5,
initialExecutors: Int = 1,
decommissioningEnabled: Boolean = false): SparkConf = {
decommissioningEnabled: Boolean = false,
exclusionEnabled: Boolean = false): SparkConf = {
val sparkConf = new SparkConf()
.set(config.DYN_ALLOCATION_ENABLED, true)
.set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
Expand All @@ -1670,6 +1704,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
// and thread "pool-1-thread-1-ScalaTest-running".
.set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
.set(DECOMMISSION_ENABLED, decommissioningEnabled)
.set(config.EXCLUDE_ON_FAILURE_ENABLED, exclusionEnabled)
sparkConf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,58 @@ class ExecutorMonitorSuite extends SparkFunSuite {
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
}

test("SPARK-33799: ExecutorMonitor should handle " +
"SparkListenerExecutorExcluded/SparkListenerExecutorUnexcluded") {
conf.set(EXCLUDE_ON_FAILURE_ENABLED, true)
.set(EXCLUDE_ON_FAILURE_KILL_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, null, clock)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 1)

// the excluded executor should not be counted
monitor.onExecutorExcluded(SparkListenerExecutorExcluded(clock.getTimeMillis(), "1", 1))
assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 0)

// the unexcluded executor can be counted again
monitor.onExecutorUnexcluded(SparkListenerExecutorUnexcluded(clock.getTimeMillis(), "1"))
assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 1)
}

test("SPARK-33799: ExecutorMonitor should handle " +
"SparkListenerNodeExcluded/SparkListenerNodeUnexcluded") {
conf.set(EXCLUDE_ON_FAILURE_ENABLED, true)
.set(EXCLUDE_ON_FAILURE_KILL_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, null, clock)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo))
assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 2)

// executors on the excluded node should not be counted
monitor.onNodeExcluded(
SparkListenerNodeExcluded(clock.getTimeMillis(), execInfo.executorHost, 1))
assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 0)

// executors on the unexcluded node can be counted again
monitor.onNodeUnexcluded(
SparkListenerNodeUnexcluded(clock.getTimeMillis(), execInfo.executorHost))
assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 2)
assert(monitor.executorCount === 2)
}

test("SPARK-33799: Excluded executor can still be timedout") {
conf.set(EXCLUDE_ON_FAILURE_ENABLED, true)
.set(EXCLUDE_ON_FAILURE_KILL_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, null, clock)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo))
assert(monitor.executorCount === 1)
// mark the executor as excluded
monitor.onExecutorExcluded(SparkListenerExecutorExcluded(clock.getTimeMillis(), "1", 1))
assert(monitor.isExecutorIdle("1"))
assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 0)
assert(monitor.getResourceProfileId("1") === DEFAULT_RESOURCE_PROFILE_ID)
}

private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1
private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1
private def shuffleDeadline: Long = clock.nanoTime() + shuffleTimeoutNs + 1
Expand Down