Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,16 @@ See the [configuration page](configuration.html) for information on Spark config
Fetcher Cache</a>
</td>
</tr>
<tr>
<td><code>spark.mesos.checkpoint</code></td>
<td>false</td>
<td>
If set, agents running tasks started by this framework will write the framework pid, executor pids and status updates to disk.

@mgummelt mgummelt May 24, 2017

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's customize this copy a bit for Spark instead of just copying the protobuf docs. e.g. "tasks" should be "executors" and you should remove the part about "this framework", in place of something about Spark in particular.

If the agent exits (e.g., due to a crash or as part of upgrading Mesos), this checkpointed data allows the restarted agent to
reconnect to executors that were started by the old instance of the agent. Enabling checkpointing improves fault tolerance,
at the cost of a (usually small) increase in disk I/O.
</td>
</tr>
</table>

# Troubleshooting and Debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
sc.conf.getOption("spark.mesos.checkpoint").map(_.toBoolean),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None,
sc.conf.getOption("spark.mesos.driver.frameworkId")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
sc.appName,
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
Option.empty,
Option.empty,
None,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better not to touch this.

None,
sc.conf.getOption("spark.mesos.driver.frameworkId")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,40 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == offerCores)
}

test("mesos supports checkpointing") {

val checkpoint = true
setBackend(Map("spark.mesos.checkpoint" -> checkpoint.toString,
"spark.mesos.driver.webui.url" -> "http://webui"))

val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val securityManager = mock[SecurityManager]

val backend = new MesosCoarseGrainedSchedulerBackend(
taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(checkpoint.contains(true))
driver
}
}

backend.start()

}

test("mesos does not acquire more than spark.cores.max") {
val maxCores = 10
setBackend(Map("spark.cores.max" -> maxCores.toString))
Expand Down