Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ private[spark] class ExecutorAllocationManager(
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}

private def totalRunningTasks(): Int = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like no one invoke this method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is being called from the test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why do we need to add a method which only used for unit test. If want to verify the behavior of totalRunningTasks, I think maxNumExecutorsNeeded can also be used indirectly for verification.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its okay to add a method which is used for unit testing purpose only. I am not inclined towards the idea of using maxNumExecutorsNeeded to indirectly verify totalRunningTasks for the following reason -

Currently, the test case is testing what it is supposed to. If you check for maxNumExecutorsNeeded instead, it might not be clear what we are testing.

listener.totalRunningTasks
}

/**
* This is called at a fixed interval to regulate the number of pending executor requests
* and number of executors running.
Expand Down Expand Up @@ -602,12 +606,11 @@ private[spark] class ExecutorAllocationManager(
private class ExecutorAllocationListener extends SparkListener {

private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
// Number of running tasks per stage including speculative tasks.
// Should be 0 when no stages are active.
private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
// Number of tasks currently running on the cluster including speculative tasks.
// Should be 0 when no stages are active.
private var numRunningTasks: Int = _

// Number of speculative tasks to be scheduled in each stage
private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
// The speculative tasks started in each stage
Expand All @@ -625,6 +628,7 @@ private[spark] class ExecutorAllocationManager(
val numTasks = stageSubmitted.stageInfo.numTasks
allocationManager.synchronized {
stageIdToNumTasks(stageId) = numTasks
stageIdToNumRunningTask(stageId) = 0
allocationManager.onSchedulerBacklogged()

// Compute the number of tasks requested by the stage on each host
Expand All @@ -651,6 +655,7 @@ private[spark] class ExecutorAllocationManager(
val stageId = stageCompleted.stageInfo.stageId
allocationManager.synchronized {
stageIdToNumTasks -= stageId
stageIdToNumRunningTask -= stageId
stageIdToNumSpeculativeTasks -= stageId
stageIdToTaskIndices -= stageId
stageIdToSpeculativeTaskIndices -= stageId
Expand All @@ -663,10 +668,6 @@ private[spark] class ExecutorAllocationManager(
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (numRunningTasks != 0) {
logWarning("No stages are running, but numRunningTasks != 0")
numRunningTasks = 0
}
}
}
}
Expand All @@ -678,7 +679,9 @@ private[spark] class ExecutorAllocationManager(
val executorId = taskStart.taskInfo.executorId

allocationManager.synchronized {
numRunningTasks += 1
if (stageIdToNumRunningTask.contains(stageId)) {
stageIdToNumRunningTask(stageId) += 1
}
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
Expand Down Expand Up @@ -709,7 +712,9 @@ private[spark] class ExecutorAllocationManager(
val taskIndex = taskEnd.taskInfo.index
val stageId = taskEnd.stageId
allocationManager.synchronized {
numRunningTasks -= 1
if (stageIdToNumRunningTask.contains(stageId)) {
stageIdToNumRunningTask(stageId) -= 1
}
// If the executor is no longer running any scheduled tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
Expand Down Expand Up @@ -787,7 +792,9 @@ private[spark] class ExecutorAllocationManager(
/**
* The number of tasks currently running across all stages.
*/
def totalRunningTasks(): Int = numRunningTasks
def totalRunningTasks(): Int = {
stageIdToNumRunningTask.values.sum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be inside allocationManager.synchronized, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, this is called from a synchronized context. Except in your unit tests, that is (which call the privatetotalRunningTasks you added to the manager).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to make the other method calling this synchronized, just to be paranoid.

}

/**
* Return true if an executor is not currently running a task, and false otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,23 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsToAdd(manager) === 1)
}

test("ignore task end events from completed stages") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
val stage = createStageInfo(0, 5)
post(sc.listenerBus, SparkListenerStageSubmitted(stage))
val taskInfo1 = createTaskInfo(0, 0, "executor-1")
val taskInfo2 = createTaskInfo(1, 1, "executor-1")
post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo1))
post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo2))

post(sc.listenerBus, SparkListenerStageCompleted(stage))

post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, taskInfo1, null))
post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, taskInfo2, null))
assert(totalRunningTasks(manager) === 0)
}

test("cancel pending executors when no longer needed") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
Expand Down Expand Up @@ -1107,6 +1124,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)
private val _onSpeculativeTaskSubmitted = PrivateMethod[Unit]('onSpeculativeTaskSubmitted)
private val _totalRunningTasks = PrivateMethod[Int]('totalRunningTasks)

private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _numExecutorsToAdd()
Expand Down Expand Up @@ -1190,6 +1208,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _localityAwareTasks()
}

private def totalRunningTasks(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _totalRunningTasks()
}

private def hostToLocalTaskCount(manager: ExecutorAllocationManager): Map[String, Int] = {
manager invokePrivate _hostToLocalTaskCount()
}
Expand Down