diff --git a/assembly/pom.xml b/assembly/pom.xml
index 9f93ce160eb50..0955ec28123f9 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../pom.xml
diff --git a/bagel/pom.xml b/bagel/pom.xml
index bafb5f19f00cb..36f464bfd63ad 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../pom.xml
diff --git a/core/pom.xml b/core/pom.xml
index 030e5d7e455e4..dc23da158395c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../pom.xml
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 9385f557c4614..adbd247f3a14d 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -27,13 +27,20 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
*
- * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
- * the scheduler queue is not drained in N seconds, then new executors are added. If the queue
- * persists for another M seconds, then more executors are added and so on. The number added
- * in each round increases exponentially from the previous round until an upper bound on the
- * number of executors has been reached. The upper bound is based both on a configured property
- * and on the number of tasks pending: the policy will never increase the number of executor
- * requests past the number needed to handle all pending tasks.
+ * The ExecutorAllocationManager maintains a moving target number of executors which is periodically
+ * synced to the cluster manager. The target starts at a configured initial value and changes with
+ * the number of pending and running tasks.
+ *
+ * Decreasing the target number of executors happens when the current target is more than needed to
+ * handle the current load. The target number of executors is always truncated to the number of
+ * executors that could run all current running and pending tasks at once.
+ *
+ * Increasing the target number of executors happens in response to backlogged tasks waiting to be
+ * scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
+ * the queue persists for another M seconds, then more executors are added and so on. The number
+ * added in each round increases exponentially from the previous round until an upper bound has been
+ * reached. The upper bound is based both on a configured property and on the current number of
+ * running and pending tasks, as described above.
*
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
@@ -105,8 +112,10 @@ private[spark] class ExecutorAllocationManager(
// Number of executors to add in the next round
private var numExecutorsToAdd = 1
- // Number of executors that have been requested but have not registered yet
- private var numExecutorsPending = 0
+ // The desired number of executors at this moment in time. If all our executors were to die, this
+ // is the number of executors we would immediately want from the cluster manager.
+ private var numExecutorsTarget =
+ conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
// Executors that have been requested to be removed but have not been killed yet
private val executorsPendingToRemove = new mutable.HashSet[String]
@@ -199,13 +208,6 @@ private[spark] class ExecutorAllocationManager(
executor.awaitTermination(10, TimeUnit.SECONDS)
}
- /**
- * The number of executors we would have if the cluster manager were to fulfill all our existing
- * requests.
- */
- private def targetNumExecutors(): Int =
- numExecutorsPending + executorIds.size - executorsPendingToRemove.size
-
/**
* The maximum number of executors we would need under the current load to satisfy all running
* and pending tasks, rounded up.
@@ -227,7 +229,7 @@ private[spark] class ExecutorAllocationManager(
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
- addOrCancelExecutorRequests(now)
+ updateAndSyncNumExecutorsTarget(now)
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
@@ -239,26 +241,28 @@ private[spark] class ExecutorAllocationManager(
}
/**
+ * Updates our target number of executors and syncs the result with the cluster manager.
+ *
* Check to see whether our existing allocation and the requests we've made previously exceed our
- * current needs. If so, let the cluster manager know so that it can cancel pending requests that
- * are unneeded.
+ * current needs. If so, truncate our target and let the cluster manager know so that it can
+ * cancel pending requests that are unneeded.
*
* If not, and the add time has expired, see if we can request new executors and refresh the add
* time.
*
* @return the delta in the target number of executors.
*/
- private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
- val currentTarget = targetNumExecutors
+ private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded
- if (maxNeeded < currentTarget) {
+ if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new
- // executors and inform the cluster manager to cancel the extra pending requests.
- val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
- client.requestTotalExecutors(newTotalExecutors)
+ // executors and inform the cluster manager to cancel the extra pending requests
+ val oldNumExecutorsTarget = numExecutorsTarget
+ numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
+ client.requestTotalExecutors(numExecutorsTarget)
numExecutorsToAdd = 1
- updateNumExecutorsPending(newTotalExecutors)
+ numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
@@ -281,21 +285,30 @@ private[spark] class ExecutorAllocationManager(
*/
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
- val currentTarget = targetNumExecutors
- if (currentTarget >= maxNumExecutors) {
- logDebug(s"Not adding executors because there are already ${executorIds.size} " +
- s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
+ if (numExecutorsTarget >= maxNumExecutors) {
+ val numExecutorsPending = numExecutorsTarget - executorIds.size
+ logDebug(s"Not adding executors because there are already ${executorIds.size} registered " +
+ s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}
- val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
- val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
- val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
+ val oldNumExecutorsTarget = numExecutorsTarget
+ // There's no point in wasting time ramping up to the number of executors we already have, so
+ // make sure our target is at least as much as our current allocation:
+ numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
+ // Boost our target with the number to add for this round:
+ numExecutorsTarget += numExecutorsToAdd
+ // Ensure that our target doesn't exceed what we need at the present moment:
+ numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
+ // Ensure that our target fits within configured bounds:
+ numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
+
+ val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
if (addRequestAcknowledged) {
- val delta = updateNumExecutorsPending(newTotalExecutors)
+ val delta = numExecutorsTarget - oldNumExecutorsTarget
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
- s" (new desired total will be $newTotalExecutors)")
+ s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
} else {
@@ -304,23 +317,11 @@ private[spark] class ExecutorAllocationManager(
delta
} else {
logWarning(
- s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
+ s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
0
}
}
- /**
- * Given the new target number of executors, update the number of pending executor requests,
- * and return the delta from the old number of pending requests.
- */
- private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
- val newNumExecutorsPending =
- newTotalExecutors - executorIds.size + executorsPendingToRemove.size
- val delta = newNumExecutorsPending - numExecutorsPending
- numExecutorsPending = newNumExecutorsPending
- delta
- }
-
/**
* Request the cluster manager to remove the given executor.
* Return whether the request is received.
@@ -372,10 +373,6 @@ private[spark] class ExecutorAllocationManager(
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
- if (numExecutorsPending > 0) {
- numExecutorsPending -= 1
- logDebug(s"Decremented number of pending executors ($numExecutorsPending left)")
- }
} else {
logWarning(s"Duplicate executor $executorId has registered")
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c136584b196a3..86c74f8d1e93f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -701,7 +701,7 @@ private[spark] object Utils extends Logging {
try {
val rootDir = new File(root)
if (rootDir.exists || rootDir.mkdirs()) {
- val dir = createDirectory(root)
+ val dir = createTempDir(root)
chmod700(dir)
Some(dir.getAbsolutePath)
} else {
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 3ded1e4af8742..114c8d4e1c7db 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -84,7 +84,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
test("starting state") {
sc = createSparkContext()
val manager = sc.executorAllocationManager.get
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 1)
assert(executorsPendingToRemove(manager).isEmpty)
assert(executorIds(manager).isEmpty)
assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
@@ -97,108 +97,108 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
// Keep adding until the limit is reached
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsTarget(manager) === 2)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
- assert(numExecutorsPending(manager) === 3)
+ assert(numExecutorsTarget(manager) === 4)
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 4)
- assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsTarget(manager) === 8)
assert(numExecutorsToAdd(manager) === 8)
- assert(addExecutors(manager) === 3) // reached the limit of 10
- assert(numExecutorsPending(manager) === 10)
+ assert(addExecutors(manager) === 2) // reached the limit of 10
+ assert(numExecutorsTarget(manager) === 10)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 0)
- assert(numExecutorsPending(manager) === 10)
+ assert(numExecutorsTarget(manager) === 10)
assert(numExecutorsToAdd(manager) === 1)
// Register previously requested executors
onExecutorAdded(manager, "first")
- assert(numExecutorsPending(manager) === 9)
+ assert(numExecutorsTarget(manager) === 10)
onExecutorAdded(manager, "second")
onExecutorAdded(manager, "third")
onExecutorAdded(manager, "fourth")
- assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsTarget(manager) === 10)
onExecutorAdded(manager, "first") // duplicates should not count
onExecutorAdded(manager, "second")
- assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsTarget(manager) === 10)
// Try adding again
// This should still fail because the number pending + running is still at the limit
assert(addExecutors(manager) === 0)
- assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsTarget(manager) === 10)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 0)
- assert(numExecutorsPending(manager) === 6)
+ assert(numExecutorsTarget(manager) === 10)
assert(numExecutorsToAdd(manager) === 1)
}
test("add executors capped by num pending tasks") {
- sc = createSparkContext(1, 10)
+ sc = createSparkContext(0, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))
// Verify that we're capped at number of tasks in the stage
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
- assert(numExecutorsPending(manager) === 3)
+ assert(numExecutorsTarget(manager) === 3)
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 2)
- assert(numExecutorsPending(manager) === 5)
+ assert(numExecutorsTarget(manager) === 5)
assert(numExecutorsToAdd(manager) === 1)
- // Verify that running a task reduces the cap
+ // Verify that running a task doesn't affect the target
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
- assert(numExecutorsPending(manager) === 4)
+ assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 5)
+ assert(numExecutorsTarget(manager) === 6)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
- assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsTarget(manager) === 8)
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 0)
- assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsTarget(manager) === 8)
assert(numExecutorsToAdd(manager) === 1)
- // Verify that re-running a task doesn't reduce the cap further
+ // Verify that re-running a task doesn't blow things up
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3)))
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 8)
+ assert(numExecutorsTarget(manager) === 9)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 9)
+ assert(numExecutorsTarget(manager) === 10)
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a task once we're at our limit doesn't blow things up
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
assert(addExecutors(manager) === 0)
- assert(numExecutorsPending(manager) === 9)
+ assert(numExecutorsTarget(manager) === 10)
}
test("cancel pending executors when no longer needed") {
- sc = createSparkContext(1, 10)
+ sc = createSparkContext(0, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
- assert(numExecutorsPending(manager) === 1)
+ assert(numExecutorsTarget(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
- assert(numExecutorsPending(manager) === 3)
+ assert(numExecutorsTarget(manager) === 3)
val task1Info = createTaskInfo(0, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info))
@@ -272,7 +272,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
// Add a few executors
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 2)
- assert(addExecutors(manager) === 4)
onExecutorAdded(manager, "1")
onExecutorAdded(manager, "2")
onExecutorAdded(manager, "3")
@@ -280,55 +279,57 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
onExecutorAdded(manager, "5")
onExecutorAdded(manager, "6")
onExecutorAdded(manager, "7")
- assert(executorIds(manager).size === 7)
+ onExecutorAdded(manager, "8")
+ assert(executorIds(manager).size === 8)
// Remove until limit
assert(removeExecutor(manager, "1"))
assert(removeExecutor(manager, "2"))
- assert(!removeExecutor(manager, "3")) // lower limit reached
- assert(!removeExecutor(manager, "4"))
+ assert(removeExecutor(manager, "3"))
+ assert(!removeExecutor(manager, "4")) // lower limit reached
+ assert(!removeExecutor(manager, "5"))
onExecutorRemoved(manager, "1")
onExecutorRemoved(manager, "2")
+ onExecutorRemoved(manager, "3")
assert(executorIds(manager).size === 5)
// Add until limit
- assert(addExecutors(manager) === 5) // upper limit reached
+ assert(addExecutors(manager) === 2) // upper limit reached
assert(addExecutors(manager) === 0)
- assert(!removeExecutor(manager, "3")) // still at lower limit
- assert(!removeExecutor(manager, "4"))
- onExecutorAdded(manager, "8")
+ assert(!removeExecutor(manager, "4")) // still at lower limit
+ assert(!removeExecutor(manager, "5"))
onExecutorAdded(manager, "9")
onExecutorAdded(manager, "10")
onExecutorAdded(manager, "11")
onExecutorAdded(manager, "12")
+ onExecutorAdded(manager, "13")
assert(executorIds(manager).size === 10)
// Remove succeeds again, now that we are no longer at the lower limit
- assert(removeExecutor(manager, "3"))
assert(removeExecutor(manager, "4"))
assert(removeExecutor(manager, "5"))
assert(removeExecutor(manager, "6"))
+ assert(removeExecutor(manager, "7"))
assert(executorIds(manager).size === 10)
- assert(addExecutors(manager) === 1)
- onExecutorRemoved(manager, "3")
+ assert(addExecutors(manager) === 0)
onExecutorRemoved(manager, "4")
+ onExecutorRemoved(manager, "5")
assert(executorIds(manager).size === 8)
- // Add succeeds again, now that we are no longer at the upper limit
- // Number of executors added restarts at 1
- assert(addExecutors(manager) === 2)
- assert(addExecutors(manager) === 1) // upper limit reached
+ // Number of executors pending restarts at 1
+ assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 0)
assert(executorIds(manager).size === 8)
- onExecutorRemoved(manager, "5")
onExecutorRemoved(manager, "6")
- onExecutorAdded(manager, "13")
+ onExecutorRemoved(manager, "7")
onExecutorAdded(manager, "14")
+ onExecutorAdded(manager, "15")
assert(executorIds(manager).size === 8)
assert(addExecutors(manager) === 0) // still at upper limit
- onExecutorAdded(manager, "15")
onExecutorAdded(manager, "16")
+ onExecutorAdded(manager, "17")
assert(executorIds(manager).size === 10)
+ assert(numExecutorsTarget(manager) === 10)
}
test("starting/canceling add timer") {
@@ -411,33 +412,33 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
}
test("mock polling loop with no events") {
- sc = createSparkContext(1, 20)
+ sc = createSparkContext(0, 20)
val manager = sc.executorAllocationManager.get
val clock = new ManualClock(2020L)
manager.setClock(clock)
// No events - we should not be adding or removing
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
schedule(manager)
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
clock.advance(100L)
schedule(manager)
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
clock.advance(1000L)
schedule(manager)
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
clock.advance(10000L)
schedule(manager)
- assert(numExecutorsPending(manager) === 0)
+ assert(numExecutorsTarget(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
}
test("mock polling loop add behavior") {
- sc = createSparkContext(1, 20)
+ sc = createSparkContext(0, 20)
val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -447,43 +448,43 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
onSchedulerBacklogged(manager)
clock.advance(schedulerBacklogTimeout * 1000 / 2)
schedule(manager)
- assert(numExecutorsPending(manager) === 0) // timer not exceeded yet
+ assert(numExecutorsTarget(manager) === 0) // timer not exceeded yet
clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 1) // first timer exceeded
+ assert(numExecutorsTarget(manager) === 1) // first timer exceeded
clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2)
schedule(manager)
- assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet
+ assert(numExecutorsTarget(manager) === 1) // second timer not exceeded yet
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded
+ assert(numExecutorsTarget(manager) === 1 + 2) // second timer exceeded
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded
+ assert(numExecutorsTarget(manager) === 1 + 2 + 4) // third timer exceeded
// Scheduler queue drained
onSchedulerQueueEmpty(manager)
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 7) // timer is canceled
+ assert(numExecutorsTarget(manager) === 7) // timer is canceled
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 7)
+ assert(numExecutorsTarget(manager) === 7)
// Scheduler queue backlogged again
onSchedulerBacklogged(manager)
clock.advance(schedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 7 + 1) // timer restarted
+ assert(numExecutorsTarget(manager) === 7 + 1) // timer restarted
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 7 + 1 + 2)
+ assert(numExecutorsTarget(manager) === 7 + 1 + 2)
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4)
+ assert(numExecutorsTarget(manager) === 7 + 1 + 2 + 4)
clock.advance(sustainedSchedulerBacklogTimeout * 1000)
schedule(manager)
- assert(numExecutorsPending(manager) === 20) // limit reached
+ assert(numExecutorsTarget(manager) === 20) // limit reached
}
test("mock polling loop remove behavior") {
@@ -677,6 +678,31 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
assert(!removeTimes(manager).contains("executor-1"))
}
+ test("avoid ramp up when target < running executors") {
+ sc = createSparkContext(0, 100000)
+ val manager = sc.executorAllocationManager.get
+ val stage1 = createStageInfo(0, 1000)
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
+
+ assert(addExecutors(manager) === 1)
+ assert(addExecutors(manager) === 2)
+ assert(addExecutors(manager) === 4)
+ assert(addExecutors(manager) === 8)
+ assert(numExecutorsTarget(manager) === 15)
+ (0 until 15).foreach { i =>
+ onExecutorAdded(manager, s"executor-$i")
+ }
+ assert(executorIds(manager).size === 15)
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
+
+ adjustRequestedExecutors(manager)
+ assert(numExecutorsTarget(manager) === 0)
+
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000)))
+ addExecutors(manager)
+ assert(numExecutorsTarget(manager) === 16)
+ }
+
private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
@@ -718,7 +744,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
* ------------------------------------------------------- */
private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
- private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending)
+ private val _numExecutorsTarget = PrivateMethod[Int]('numExecutorsTarget)
private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded)
private val _executorsPendingToRemove =
PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
@@ -727,7 +753,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes)
private val _schedule = PrivateMethod[Unit]('schedule)
private val _addExecutors = PrivateMethod[Int]('addExecutors)
- private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests)
+ private val _updateAndSyncNumExecutorsTarget =
+ PrivateMethod[Int]('updateAndSyncNumExecutorsTarget)
private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
@@ -740,8 +767,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _numExecutorsToAdd()
}
- private def numExecutorsPending(manager: ExecutorAllocationManager): Int = {
- manager invokePrivate _numExecutorsPending()
+ private def numExecutorsTarget(manager: ExecutorAllocationManager): Int = {
+ manager invokePrivate _numExecutorsTarget()
}
private def executorsPendingToRemove(
@@ -771,7 +798,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
}
private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = {
- manager invokePrivate _addOrCancelExecutorRequests(0L)
+ manager invokePrivate _updateAndSyncNumExecutorsTarget(0L)
}
private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = {
diff --git a/examples/pom.xml b/examples/pom.xml
index b160fd5824277..da8df5adefc1d 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../pom.xml
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 274210f7e3da1..bdd077f64227d 100644
--- a/external/flume-sink/pom.xml
+++ b/external/flume-sink/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 5c03ebceac99a..8822a27b66cf4 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml
index af188e87c206c..f56dd00f65447 100644
--- a/external/kafka-assembly/pom.xml
+++ b/external/kafka-assembly/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index a334c6272c157..0877a9b9dc7fc 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 853ffa3781594..3672039bc85c1 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
index 2dfd6c3163397..b757faa9d91a4 100644
--- a/external/twitter/pom.xml
+++ b/external/twitter/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
index 7b3fad6a05048..151c94f43c5b0 100644
--- a/external/zeromq/pom.xml
+++ b/external/zeromq/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml
index d67dc818249e6..f47524e6585f3 100644
--- a/extras/java8-tests/pom.xml
+++ b/extras/java8-tests/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml
index 7c2da9ccdb83f..d37e1c349ff85 100644
--- a/extras/kinesis-asl/pom.xml
+++ b/extras/kinesis-asl/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml
index fbcbda9922b8d..66bb68d15819a 100644
--- a/extras/spark-ganglia-lgpl/pom.xml
+++ b/extras/spark-ganglia-lgpl/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/graphx/pom.xml b/graphx/pom.xml
index f9ae287b686d3..1c44c938b37bb 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../pom.xml
diff --git a/mllib/pom.xml b/mllib/pom.xml
index 3f976de9213bf..ca459d5751784 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../pom.xml
diff --git a/network/common/pom.xml b/network/common/pom.xml
index 6663d3672a379..927fb5fc13731 100644
--- a/network/common/pom.xml
+++ b/network/common/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml
index b56eb810288ff..63485bfe7c3d7 100644
--- a/network/shuffle/pom.xml
+++ b/network/shuffle/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml
index 7a52cf7e03695..7af01f92ea46d 100644
--- a/network/yarn/pom.xml
+++ b/network/yarn/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/pom.xml b/pom.xml
index 59813ee0a27c6..f2ca642c5c94a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
pom
Spark Project Parent POM
http://spark.apache.org/
diff --git a/repl/pom.xml b/repl/pom.xml
index 9b37f6a130b3b..63089298c9680 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../pom.xml
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 9e4ac9542e5b8..a107435aa1bab 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index b3325ede47472..5d2bc209a1954 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index 1a1c1669856c5..ddfe31a61cf7e 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 6b91727606959..30f6e6c869ef3 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -22,7 +22,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../../pom.xml
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 5cd9f30fbe765..026fe72a77731 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -21,7 +21,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../pom.xml
diff --git a/tools/pom.xml b/tools/pom.xml
index 500e1a12fb07f..f02edb155a87e 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../pom.xml
diff --git a/yarn/pom.xml b/yarn/pom.xml
index e23f9280468ce..a8e0210a09a45 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -20,7 +20,7 @@
org.apache.spark
spark-parent_2.10
- 1.3.1-palantir2
+ 1.3.1-palantir3
../pom.xml
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index c98763e15b58f..2c4ff9df3e6ec 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -373,7 +373,9 @@ private[yarn] class YarnAllocator(
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
+ if (completedContainer.getExitStatus == ContainerExitStatus.PREEMPTED) {
+ logInfo("Container preempted: " + containerId)
+ } else if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
logWarning(memLimitExceededLogMessage(
completedContainer.getDiagnostics,
VMEM_EXCEEDED_PATTERN))