diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 3c2a1501ca69..7122304f6f01 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -435,6 +435,15 @@ See the [configuration page](configuration.html) for information on Spark config the final overhead will be this value. + + spark.mesos.driver.memoryOverhead + driver memory * 0.10, with minimum of 384 + + The amount of additional memory, specified in MB, to be allocated to the driver. By default, + the overhead will be larger of either 384 or 10% of spark.driver.memory. If set, + the final overhead will be this value. Only applies to cluster mode. + + spark.mesos.uris (none) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index d134847dc74d..08d33b0ec285 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -129,4 +129,12 @@ package object config { "when launching drivers. Default is to accept all offers with sufficient resources.") .stringConf .createWithDefault("") + + private[spark] val DRIVER_MEMORY_OVERHEAD = + ConfigBuilder("spark.mesos.driver.memoryOverhead") + .doc("The amount of additional memory, specified in MB, to be allocated to the driver. " + + "By default, the overhead will be larger of either 384 or 10% of spark.driver.memory. " + + "Only applies to cluster mode.") + .intConf + .createOptional } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index d224a7325820..0c2461f9111c 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -568,7 +568,7 @@ private[spark] class MesosClusterScheduler( val (remainingResources, cpuResourcesToUse) = partitionResources(offer.remainingResources, "cpus", desc.cores) val (finalResources, memResourcesToUse) = - partitionResources(remainingResources.asJava, "mem", desc.mem) + partitionResources(remainingResources.asJava, "mem", driverContainerMemory(desc)) offer.remainingResources = finalResources.asJava val appName = desc.conf.get("spark.app.name") @@ -600,7 +600,7 @@ private[spark] class MesosClusterScheduler( tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = { for (submission <- candidates) { val driverCpu = submission.cores - val driverMem = submission.mem + val driverMem = driverContainerMemory(submission) val driverConstraints = parseConstraintString(submission.conf.get(config.DRIVER_CONSTRAINTS)) logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem, " + diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index ecbcc960fc5a..c2cbd8375376 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -36,6 +36,7 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessageV3} import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.TaskState +import org.apache.spark.deploy.mesos.{config, MesosDriverDescription} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.Utils @@ -404,6 +405,19 @@ trait MesosSchedulerUtils extends Logging { sc.executorMemory } + /** + * Return the amount of memory to allocate to each driver, taking into account + * container overheads. + * + * @param driverDesc used to get driver memory + * @return memory requirement as (0.1 * memoryOverhead) or MEMORY_OVERHEAD_MINIMUM + * (whichever is larger) + */ + def driverContainerMemory(driverDesc: MesosDriverDescription): Int = { + val defaultMem = math.max(MEMORY_OVERHEAD_FRACTION * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM) + driverDesc.conf.get(config.DRIVER_MEMORY_OVERHEAD).getOrElse(defaultMem.toInt) + driverDesc.mem + } + def setupUris(uris: String, builder: CommandInfo.Builder, useFetcherCache: Boolean = false): Unit = { diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index e534b9d7e3ed..7a387c41626e 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -104,7 +104,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 1200, 1.5, true, command, - Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), + Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test"), + ("spark.mesos.driver.memoryOverhead", "0")), "s1", new Date())) assert(response.success) @@ -199,6 +200,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi }) } + test("supports spark.mesos.driver.memoryOverhead") { + setScheduler() + + val mem = 1000 + val cpu = 1 + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", mem, cpu, true, + command, + Map("spark.mesos.executor.home" -> "test", + "spark.app.name" -> "test"), + "s1", + new Date())) + assert(response.success) + + val offer = Utils.createOffer("o1", "s1", mem*2, cpu) + scheduler.resourceOffers(driver, List(offer).asJava) + val tasks = Utils.verifyTaskLaunched(driver, "o1") + // 1384.0 + val taskMem = tasks.head.getResourcesList + .asScala + .filter(_.getName.equals("mem")) + .map(_.getScalar.getValue) + .head + assert(1384.0 === taskMem) + } + test("supports spark.mesos.driverEnv.*") { setScheduler() @@ -210,7 +238,9 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi command, Map("spark.mesos.executor.home" -> "test", "spark.app.name" -> "test", - "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"), + "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL", + "spark.mesos.driver.memoryOverhead" -> "0" + ), "s1", new Date())) assert(response.success) @@ -235,7 +265,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi Map("spark.mesos.executor.home" -> "test", "spark.app.name" -> "test", "spark.mesos.network.name" -> "test-network-name", - "spark.mesos.network.labels" -> "key1:val1,key2:val2"), + "spark.mesos.network.labels" -> "key1:val1,key2:val2", + "spark.mesos.driver.memoryOverhead" -> "0"), "s1", new Date())) @@ -274,7 +305,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi command, Map("spark.mesos.executor.home" -> "test", "spark.app.name" -> "test", - config.DRIVER_CONSTRAINTS.key -> driverConstraints), + config.DRIVER_CONSTRAINTS.key -> driverConstraints, + "spark.mesos.driver.memoryOverhead" -> "0"), "s1", new Date())) assert(response.success) @@ -312,7 +344,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi command, Map("spark.mesos.executor.home" -> "test", "spark.app.name" -> "test", - "spark.mesos.driver.labels" -> "key:value"), + "spark.mesos.driver.labels" -> "key:value", + "spark.mesos.driver.memoryOverhead" -> "0"), "s1", new Date())) @@ -423,7 +456,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi true, command, Map("spark.mesos.executor.home" -> "test", - "spark.app.name" -> "test") ++ + "spark.app.name" -> "test", + "spark.mesos.driver.memoryOverhead" -> "0") ++ addlSparkConfVars, "s1", new Date())