diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 8d5ad12cb85b..7c4b8edd9a07 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -417,6 +417,15 @@ See the [configuration page](configuration.html) for information on Spark config
and resource weight sharing.
+
+ spark.mesos.failoverTimeout |
+ Integer.MAX for cluster scheduler, 0.0 for Spark jobs |
+
+ The Mesos framework failover timeout in seconds. This acts as a threshold for Mesos masters.
+ Exceeding the timeout, when the Spark driver looses the connection to the Mesos master without reconnecting,
+ will cause the Mesos master to remove the associated executors.
+ |
+
spark.mesos.constraints |
(none) |
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1bc6f71860c3..e3ba18e7ca62 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -313,7 +313,7 @@ private[spark] class MesosClusterScheduler(
conf,
Some(frameworkUrl),
Some(true),
- Some(Integer.MAX_VALUE),
+ conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble).orElse(Some(Integer.MAX_VALUE)),
fwId)
startScheduler(driver)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index c049a32eabf9..897d830ef366 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -165,7 +165,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
- None,
+ sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index f198f8893b3d..be197aab337d 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -82,7 +82,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
Option.empty,
- Option.empty,
+ sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 3f25535cb5ec..a36acc2551e7 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -56,7 +56,8 @@ trait MesosSchedulerUtils extends Logging {
* @param conf Spark configuration
* @param webuiUrl The WebUI url to link from Mesos UI
* @param checkpoint Option to checkpoint tasks for failover
- * @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect
+ * @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect in
+ * seconds
* @param frameworkId The id of the new framework
*/
protected def createSchedulerDriver(
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 32967b04cd34..5f244e5e6959 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -21,9 +21,9 @@ import java.util.{Collection, Collections, Date}
import scala.collection.JavaConverters._
+import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Value.{Scalar, Type}
-import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
@@ -69,6 +69,45 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
new Date())
}
+ private def testFailoverTimeout(
+ confValue: Option[String],
+ expectedTimeout: Option[Double]): Unit = {
+ val schedulerAppName = "testApp"
+ val master = "mesos://localhost:5050"
+ val sparkConf = new SparkConf()
+ sparkConf.setMaster(master)
+ sparkConf.setAppName(schedulerAppName)
+ confValue.foreach(sparkConf.set("spark.mesos.failoverTimeout", _))
+
+ val driver = mock[SchedulerDriver]
+ when(driver.run()).thenReturn(Protos.Status.DRIVER_RUNNING)
+ val clusterScheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, sparkConf) {
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String],
+ checkpoint: Option[Boolean],
+ failoverTimeout: Option[Double],
+ frameworkId: Option[String]): SchedulerDriver = {
+ markRegistered()
+
+ assert(masterUrl == master)
+ assert(appName == schedulerAppName)
+ assert(conf == sparkConf)
+ assert(checkpoint.contains(true))
+ assert(failoverTimeout == expectedTimeout)
+
+ driver
+ }
+ }
+
+ clusterScheduler.start()
+ }
+
test("can queue drivers") {
setScheduler()
@@ -306,4 +345,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
verify(driver, times(1)).declineOffer(offerId, filter)
}
+
+ test("supports spark.mesos.failoverTimeout") {
+ testFailoverTimeout(None, Some(Integer.MAX_VALUE))
+ testFailoverTimeout(Some("3"), Some(3.0))
+ testFailoverTimeout(Some("5.8"), Some(5.8))
+ }
}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 98033bec6dd6..5f1ebe779e6a 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -594,4 +594,50 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient)
}
+
+ private def testFailoverTimeout(
+ confValue: Option[String],
+ expectedTimeout: Option[Double]): Unit = {
+ val master = "local[*]"
+ val sparkConf = new SparkConf()
+ .setMaster(master)
+ .setAppName("testApp")
+ confValue.foreach(sparkConf.set("spark.mesos.failoverTimeout", _))
+
+ resetSparkContext()
+ sc = new SparkContext(sparkConf)
+
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.sc).thenReturn(sc)
+ val securityManager = mock[SecurityManager]
+ val driver = mock[SchedulerDriver]
+ when(driver.run()).thenReturn(Protos.Status.DRIVER_RUNNING)
+ val clusterScheduler = new MesosCoarseGrainedSchedulerBackend(
+ taskScheduler, sc, master, securityManager) {
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String],
+ checkpoint: Option[Boolean],
+ failoverTimeout: Option[Double],
+ frameworkId: Option[String]): SchedulerDriver = {
+ markRegistered()
+
+ assert(failoverTimeout == expectedTimeout)
+
+ driver
+ }
+ }
+
+ clusterScheduler.start()
+ }
+
+ test("supports spark.mesos.failoverTimeout") {
+ testFailoverTimeout(None, None)
+ testFailoverTimeout(Some("3"), Some(3.0))
+ testFailoverTimeout(Some("5.8"), Some(5.8))
+ }
}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index 4ee85b91830a..18f0923d26aa 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -401,4 +401,50 @@ class MesosFineGrainedSchedulerBackendSuite
r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod")
})
}
+
+ private def testFailoverTimeout(
+ confValue: Option[String],
+ expectedTimeout: Option[Double]): Unit = {
+ val schedulerAppName = "testApp"
+ val master = "mesos://localhost:5050"
+ val sparkConf = new SparkConf()
+ .setMaster(master)
+ .setAppName(schedulerAppName)
+ .set("spark.mesos.driver.webui.url", "http://webui")
+ confValue.foreach(sparkConf.set("spark.mesos.failoverTimeout", _))
+
+ val sc = mock[SparkContext]
+ when(sc.conf).thenReturn(sparkConf)
+
+ val taskScheduler = mock[TaskSchedulerImpl]
+ val driver = mock[SchedulerDriver]
+ when(driver.run()).thenReturn(Protos.Status.DRIVER_RUNNING)
+ val clusterScheduler = new MesosFineGrainedSchedulerBackend(
+ taskScheduler, sc, master) {
+ override protected def createSchedulerDriver(
+ masterUrl: String,
+ scheduler: Scheduler,
+ sparkUser: String,
+ appName: String,
+ conf: SparkConf,
+ webuiUrl: Option[String],
+ checkpoint: Option[Boolean],
+ failoverTimeout: Option[Double],
+ frameworkId: Option[String]): SchedulerDriver = {
+ markRegistered()
+
+ assert(failoverTimeout == expectedTimeout)
+
+ driver
+ }
+ }
+
+ clusterScheduler.start()
+ }
+
+ test("supports spark.mesos.failoverTimeout") {
+ testFailoverTimeout(None, None)
+ testFailoverTimeout(Some("3"), Some(3.0))
+ testFailoverTimeout(Some("5.8"), Some(5.8))
+ }
}