Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,15 @@ See the [configuration page](configuration.html) for information on Spark config
the final overhead will be this value.
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.memoryOverhead</code></td>
<td>driver memory * 0.10, with minimum of 384</td>
<td>
The amount of additional memory, specified in MB, to be allocated to the driver. By default,
the overhead will be larger of either 384 or 10% of <code>spark.driver.memory</code>. If set,
the final overhead will be this value. Only applies to cluster mode.
</td>
</tr>
<tr>
<td><code>spark.mesos.uris</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,12 @@ package object config {
"when launching drivers. Default is to accept all offers with sufficient resources.")
.stringConf
.createWithDefault("")

private[spark] val DRIVER_MEMORY_OVERHEAD =
ConfigBuilder("spark.mesos.driver.memoryOverhead")
.doc("The amount of additional memory, specified in MB, to be allocated to the driver. " +
"By default, the overhead will be larger of either 384 or 10% of spark.driver.memory. " +
"Only applies to cluster mode.")
.intConf
.createOptional
}
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ private[spark] class MesosClusterScheduler(
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.remainingResources, "cpus", desc.cores)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", desc.mem)
partitionResources(remainingResources.asJava, "mem", driverContainerMemory(desc))
offer.remainingResources = finalResources.asJava

val appName = desc.conf.get("spark.app.name")
Expand Down Expand Up @@ -600,7 +600,7 @@ private[spark] class MesosClusterScheduler(
tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
for (submission <- candidates) {
val driverCpu = submission.cores
val driverMem = submission.mem
val driverMem = driverContainerMemory(submission)
val driverConstraints =
parseConstraintString(submission.conf.get(config.DRIVER_CONSTRAINTS))
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessageV3}

import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.TaskState
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -404,6 +405,19 @@ trait MesosSchedulerUtils extends Logging {
sc.executorMemory
}

/**
* Return the amount of memory to allocate to each driver, taking into account
* container overheads.
*
* @param driverDesc used to get driver memory
* @return memory requirement as (0.1 * memoryOverhead) or MEMORY_OVERHEAD_MINIMUM
* (whichever is larger)
*/
def driverContainerMemory(driverDesc: MesosDriverDescription): Int = {
val defaultMem = math.max(MEMORY_OVERHEAD_FRACTION * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM)
driverDesc.conf.get(config.DRIVER_MEMORY_OVERHEAD).getOrElse(defaultMem.toInt) + driverDesc.mem
}

def setupUris(uris: String,
builder: CommandInfo.Builder,
useFetcherCache: Boolean = false): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
command,
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test"),
("spark.mesos.driver.memoryOverhead", "0")),
"s1",
new Date()))
assert(response.success)
Expand Down Expand Up @@ -199,6 +200,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
})
}

test("supports spark.mesos.driver.memoryOverhead") {
setScheduler()

val mem = 1000
val cpu = 1

val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test"),
"s1",
new Date()))
assert(response.success)

val offer = Utils.createOffer("o1", "s1", mem*2, cpu)
scheduler.resourceOffers(driver, List(offer).asJava)
val tasks = Utils.verifyTaskLaunched(driver, "o1")
// 1384.0
val taskMem = tasks.head.getResourcesList
.asScala
.filter(_.getName.equals("mem"))
.map(_.getScalar.getValue)
.head
assert(1384.0 === taskMem)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also test the 10% case as well?

}

test("supports spark.mesos.driverEnv.*") {
setScheduler()

Expand All @@ -210,7 +238,9 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"),
"spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL",
"spark.mesos.driver.memoryOverhead" -> "0"
),
"s1",
new Date()))
assert(response.success)
Expand All @@ -235,7 +265,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.network.name" -> "test-network-name",
"spark.mesos.network.labels" -> "key1:val1,key2:val2"),
"spark.mesos.network.labels" -> "key1:val1,key2:val2",
"spark.mesos.driver.memoryOverhead" -> "0"),
"s1",
new Date()))

Expand Down Expand Up @@ -274,7 +305,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
config.DRIVER_CONSTRAINTS.key -> driverConstraints),
config.DRIVER_CONSTRAINTS.key -> driverConstraints,
"spark.mesos.driver.memoryOverhead" -> "0"),
"s1",
new Date()))
assert(response.success)
Expand Down Expand Up @@ -312,7 +344,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driver.labels" -> "key:value"),
"spark.mesos.driver.labels" -> "key:value",
"spark.mesos.driver.memoryOverhead" -> "0"),
"s1",
new Date()))

Expand Down Expand Up @@ -423,7 +456,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test") ++
"spark.app.name" -> "test",
"spark.mesos.driver.memoryOverhead" -> "0") ++
addlSparkConfVars,
"s1",
new Date())
Expand Down