Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,15 @@ See the [configuration page](configuration.html) for information on Spark config
and resource weight sharing.
</td>
</tr>
<tr>
<td><code>spark.mesos.failoverTimeout</code></td>
<td><code>Integer.MAX</code> for cluster scheduler, <code>0.0</code> for Spark jobs</td>
<td>
The Mesos framework failover timeout in seconds. This acts as a threshold for Mesos masters.
Exceeding the timeout, when the Spark driver looses the connection to the Mesos master without reconnecting,
will cause the Mesos master to remove the associated executors.
</td>
</tr>
<tr>
<td><code>spark.mesos.constraints</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ private[spark] class MesosClusterScheduler(
conf,
Some(frameworkUrl),
Some(true),
Some(Integer.MAX_VALUE),
conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble).orElse(Some(Integer.MAX_VALUE)),
fwId)

startScheduler(driver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
None,
sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
Option.empty,
Option.empty,
sc.conf.getOption("spark.mesos.failoverTimeout").map(_.toDouble),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ trait MesosSchedulerUtils extends Logging {
* @param conf Spark configuration
* @param webuiUrl The WebUI url to link from Mesos UI
* @param checkpoint Option to checkpoint tasks for failover
* @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect
* @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect in
* seconds
* @param frameworkId The id of the new framework
*/
protected def createSchedulerDriver(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import java.util.{Collection, Collections, Date}

import scala.collection.JavaConverters._

import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
Expand Down Expand Up @@ -69,6 +69,45 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
new Date())
}

private def testFailoverTimeout(
confValue: Option[String],
expectedTimeout: Option[Double]): Unit = {
val schedulerAppName = "testApp"
val master = "mesos://localhost:5050"
val sparkConf = new SparkConf()
sparkConf.setMaster(master)
sparkConf.setAppName(schedulerAppName)
confValue.foreach(sparkConf.set("spark.mesos.failoverTimeout", _))

val driver = mock[SchedulerDriver]
when(driver.run()).thenReturn(Protos.Status.DRIVER_RUNNING)
val clusterScheduler = new MesosClusterScheduler(
new BlackHoleMesosClusterPersistenceEngineFactory, sparkConf) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String],
checkpoint: Option[Boolean],
failoverTimeout: Option[Double],
frameworkId: Option[String]): SchedulerDriver = {
markRegistered()

assert(masterUrl == master)
assert(appName == schedulerAppName)
assert(conf == sparkConf)
assert(checkpoint.contains(true))
assert(failoverTimeout == expectedTimeout)

driver
}
}

clusterScheduler.start()
}

test("can queue drivers") {
setScheduler()

Expand Down Expand Up @@ -306,4 +345,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi

verify(driver, times(1)).declineOffer(offerId, filter)
}

test("supports spark.mesos.failoverTimeout") {
testFailoverTimeout(None, Some(Integer.MAX_VALUE))
testFailoverTimeout(Some("3"), Some(3.0))
testFailoverTimeout(Some("5.8"), Some(5.8))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -594,4 +594,50 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite

backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient)
}

private def testFailoverTimeout(
confValue: Option[String],
expectedTimeout: Option[Double]): Unit = {
val master = "local[*]"
val sparkConf = new SparkConf()
.setMaster(master)
.setAppName("testApp")
confValue.foreach(sparkConf.set("spark.mesos.failoverTimeout", _))

resetSparkContext()
sc = new SparkContext(sparkConf)

val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val securityManager = mock[SecurityManager]
val driver = mock[SchedulerDriver]
when(driver.run()).thenReturn(Protos.Status.DRIVER_RUNNING)
val clusterScheduler = new MesosCoarseGrainedSchedulerBackend(
taskScheduler, sc, master, securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String],
checkpoint: Option[Boolean],
failoverTimeout: Option[Double],
frameworkId: Option[String]): SchedulerDriver = {
markRegistered()

assert(failoverTimeout == expectedTimeout)

driver
}
}

clusterScheduler.start()
}

test("supports spark.mesos.failoverTimeout") {
testFailoverTimeout(None, None)
testFailoverTimeout(Some("3"), Some(3.0))
testFailoverTimeout(Some("5.8"), Some(5.8))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,50 @@ class MesosFineGrainedSchedulerBackendSuite
r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod")
})
}

private def testFailoverTimeout(
confValue: Option[String],
expectedTimeout: Option[Double]): Unit = {
val schedulerAppName = "testApp"
val master = "mesos://localhost:5050"
val sparkConf = new SparkConf()
.setMaster(master)
.setAppName(schedulerAppName)
.set("spark.mesos.driver.webui.url", "http://webui")
confValue.foreach(sparkConf.set("spark.mesos.failoverTimeout", _))

val sc = mock[SparkContext]
when(sc.conf).thenReturn(sparkConf)

val taskScheduler = mock[TaskSchedulerImpl]
val driver = mock[SchedulerDriver]
when(driver.run()).thenReturn(Protos.Status.DRIVER_RUNNING)
val clusterScheduler = new MesosFineGrainedSchedulerBackend(
taskScheduler, sc, master) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String],
checkpoint: Option[Boolean],
failoverTimeout: Option[Double],
frameworkId: Option[String]): SchedulerDriver = {
markRegistered()

assert(failoverTimeout == expectedTimeout)

driver
}
}

clusterScheduler.start()
}

test("supports spark.mesos.failoverTimeout") {
testFailoverTimeout(None, None)
testFailoverTimeout(Some("3"), Some(3.0))
testFailoverTimeout(Some("5.8"), Some(5.8))
}
}