diff --git a/assembly/pom.xml b/assembly/pom.xml index ba233e70a29f5..96824e04d52ce 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/bagel/pom.xml b/bagel/pom.xml index c5e9183ddc945..926d3f1727126 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/core/pom.xml b/core/pom.xml index f0d236df406c1..47eb316b2b013 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 6909015ff66e6..3183094e567e8 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -132,7 +132,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext) // Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - sc.killExecutor(executorId) + // Note: we want to get an executor back after expiring this one, + // so do not simply call `sc.killExecutor` here (SPARK-8119) + sc.killAndReplaceExecutor(executorId) } }) } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d499aba7905ed..c5899921d77d0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1409,6 +1409,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: * Request that the cluster manager kill the specified executors. + * + * Note: This is an indication to the cluster manager that the application wishes to adjust + * its resource usage downwards. If the application wishes to replace the executors it kills + * through this method with new ones, it should follow up explicitly with a call to + * {{SparkContext#requestExecutors}}. + * * This is currently only supported in YARN mode. Return whether the request is received. */ @DeveloperApi @@ -1426,12 +1432,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: - * Request that cluster manager the kill the specified executor. - * This is currently only supported in Yarn mode. Return whether the request is received. + * Request that the cluster manager kill the specified executor. + * + * Note: This is an indication to the cluster manager that the application wishes to adjust + * its resource usage downwards. If the application wishes to replace the executor it kills + * through this method with a new one, it should follow up explicitly with a call to + * {{SparkContext#requestExecutors}}. + * + * This is currently only supported in YARN mode. Return whether the request is received. */ @DeveloperApi override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId) + /** + * Request that the cluster manager kill the specified executor without adjusting the + * application resource requirements. + * + * The effect is that a new executor will be launched in place of the one killed by + * this request. This assumes the cluster manager will automatically and eventually + * fulfill all missing application resource requests. + * + * Note: The replace is by no means guaranteed; another application on the same cluster + * can steal the window of opportunity and acquire this application's resources in the + * mean time. + * + * This is currently only supported in YARN mode. Return whether the request is received. + */ + private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { + schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutors(Seq(executorId), replace = true) + case _ => + logWarning("Killing executors is only supported in coarse-grained mode") + false + } + } + /** The version of Spark on which this application is running. */ def version: String = SPARK_VERSION diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala index 8f01174f1b2d8..42910104cda55 100644 --- a/core/src/main/scala/org/apache/spark/package.scala +++ b/core/src/main/scala/org/apache/spark/package.scala @@ -43,5 +43,5 @@ package org.apache package object spark { // For package docs only - val SPARK_VERSION = "1.4.1" + val SPARK_VERSION = "1.4.1-palantir1" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ab626becf0273..4c00e0924e445 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -367,26 +367,36 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Request that the cluster manager kill the specified executors. - * Return whether the kill request is acknowledged. + * @return whether the kill request is acknowledged. */ final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized { + killExecutors(executorIds, replace = false) + } + + /** + * Request that the cluster manager kill the specified executors. + * + * @param executorIds identifiers of executors to kill + * @param replace whether to replace the killed executors with new ones + * @return whether the kill request is acknowledged. + */ + final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") - val filteredExecutorIds = new ArrayBuffer[String] - executorIds.foreach { id => - if (executorDataMap.contains(id)) { - filteredExecutorIds += id - } else { - logWarning(s"Executor to kill $id does not exist!") - } + val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) + unknownExecutors.foreach { id => + logWarning(s"Executor to kill $id does not exist!") + } + + // If we do not wish to replace the executors we kill, sync the target number of executors + // with the cluster manager to avoid allocating new ones. When computing the new target, + // take into account executors that are pending to be added or removed. + if (!replace) { + doRequestTotalExecutors(numExistingExecutors + numPendingExecutors + - executorsPendingToRemove.size - knownExecutors.size) } - // Killing executors means effectively that we want less executors than before, so also update - // the target number of executors to avoid having the backend allocate new ones. - val newTotal = (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size - - filteredExecutorIds.size) - doRequestTotalExecutors(newTotal) - executorsPendingToRemove ++= filteredExecutorIds - doKillExecutors(filteredExecutorIds) + executorsPendingToRemove ++= knownExecutors + doKillExecutors(knownExecutors) } /** diff --git a/examples/pom.xml b/examples/pom.xml index e9a9cc2ebe276..79fc09057aa76 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 7eae7a7a846a1..07117ee80998e 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/external/flume/pom.xml b/external/flume/pom.xml index b3ad09adae008..f0775a205dfb2 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/external/kafka-assembly/pom.xml b/external/kafka-assembly/pom.xml index c05bd1bad0d82..201ecf8282e42 100644 --- a/external/kafka-assembly/pom.xml +++ b/external/kafka-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index c8f844084ce1f..234063d758597 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index 908bd6378ed30..a58a85e42d67f 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index 68e67b8a7c3b6..39ece3f352931 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index e12a87370c3f9..2d3ada5cce70c 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml index f57dadb367d1e..9c6a6e56129a6 100644 --- a/extras/java8-tests/pom.xml +++ b/extras/java8-tests/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index 8fa488cb625d3..b9ce153ad5449 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml index a9f1fad415fb8..1db061c748a6e 100644 --- a/extras/spark-ganglia-lgpl/pom.xml +++ b/extras/spark-ganglia-lgpl/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/graphx/pom.xml b/graphx/pom.xml index 901ec740f6b68..e431dfa8930f5 100644 --- a/graphx/pom.xml +++ b/graphx/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/launcher/pom.xml b/launcher/pom.xml index 9804ca700c811..561d63feed3e9 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/mllib/pom.xml b/mllib/pom.xml index 911c8b4d8f0d7..329a668dbb611 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/network/common/pom.xml b/network/common/pom.xml index 7ee504b929c37..9e58bd224eeec 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index 48837df6e6205..99972f4a49137 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/network/yarn/pom.xml b/network/yarn/pom.xml index 3847a34162a4a..2b2a7a737c900 100644 --- a/network/yarn/pom.xml +++ b/network/yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/pom.xml b/pom.xml index 91e25c4b5417c..9d75aafbdfdc9 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 pom Spark Project Parent POM http://spark.apache.org/ diff --git a/repl/pom.xml b/repl/pom.xml index e9b833bae88a7..c6b9aa85c3180 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml index d33b9eeb5a7fa..8372520502992 100644 --- a/sql/catalyst/pom.xml +++ b/sql/catalyst/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/sql/core/pom.xml b/sql/core/pom.xml index b5121059a8167..8d6c50e7423e2 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 81aa1376734ac..bc6a9c44b2be5 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index a2ee048ec564c..226448dca8c0f 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../../pom.xml diff --git a/streaming/pom.xml b/streaming/pom.xml index 974f5a52c77bc..5d32f825b11b9 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/tools/pom.xml b/tools/pom.xml index eb14e4fe28322..ef3843ac40609 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/unsafe/pom.xml b/unsafe/pom.xml index 606a0c8005e97..07a58b38515eb 100644 --- a/unsafe/pom.xml +++ b/unsafe/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml diff --git a/yarn/pom.xml b/yarn/pom.xml index 09f9f0f66dc36..e3fad4bff06b1 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -20,7 +20,7 @@ org.apache.spark spark-parent_2.10 - 1.4.1 + 1.4.1-palantir1 ../pom.xml