diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7e564061e69b..365e0466d0ae 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -470,12 +470,25 @@ class SparkContext(config: SparkConf) extends Logging { files.foreach(addFile) } - _executorMemory = _conf.getOption("spark.executor.memory") - .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) - .orElse(Option(System.getenv("SPARK_MEM")) - .map(warnSparkMem)) - .map(Utils.memoryStringToMb) - .getOrElse(1024) + _executorMemory = { + val defaultMemory = 1024 + val configuredMemory = _conf.getOption("spark.executor.memory") + .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) + .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem)) + .map(Utils.memoryStringToMb) + // In local-cluster mode, always use the slave memory specified in the master string + // In other modes, use the configured memory if it exists + master match { + case SparkMasterRegex.LOCAL_CLUSTER_REGEX(_, _, em) => + if (configuredMemory.isDefined) { + logWarning(s"Ignoring explicit setting of executor" + + s"memory $configuredMemory in local-cluster mode") + } + em.toInt + case _ => + configuredMemory.getOrElse(defaultMemory) + } + } // Convert java options to env vars as a work around // since we can't set env vars directly in sbt. @@ -2707,14 +2720,8 @@ object SparkContext extends Logging { (backend, scheduler) case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => - // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt - if (sc.executorMemory > memoryPerSlaveInt) { - throw new SparkException( - "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( - memoryPerSlaveInt, sc.executorMemory)) - } - + assert(sc.executorMemory == memoryPerSlaveInt) val scheduler = new TaskSchedulerImpl(sc) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5ffdedd1658a..533f07c1447a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -466,7 +466,7 @@ object SparkSubmit extends CommandLineUtils { // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.cores"), - OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, + OptionAssigner(args.executorMemory, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 9417930d0240..ab19d95722bf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -388,6 +388,22 @@ class SparkSubmitSuite runSparkSubmit(args) } + test("executor memory in local-cluster mode") { + val executorMemoryMb = 1888 + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val args = Seq( + "--class", LocalClusterExecutorMemoryTest.getClass.getName.stripSuffix("$"), + "--name", "testApp", + "--master", s"local-cluster[2,1,$executorMemoryMb]", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--conf", s"spark.executor.memory=${executorMemoryMb * 2}", // not used + "--conf", "spark.testing.reservedMemory=0", // needed to avoid SPARK-12759 + unusedJar.toString, + executorMemoryMb.toString) + runSparkSubmit(args) + } + test("includes jars passed in through --jars") { val unusedJar = TestUtils.createJarWithClasses(Seq.empty) val jar1 = TestUtils.createJarWithClasses(Seq("SparkSubmitClassA")) @@ -720,6 +736,21 @@ object JarCreationTest extends Logging { } } +object LocalClusterExecutorMemoryTest { + def main(args: Array[String]): Unit = { + Utils.configTestLog4j("INFO") + val sc = new SparkContext + if (args.length != 1) { + throw new IllegalArgumentException("Excepted exactly 1 argument, got " + args.length) + } + val executorMemory = args.head.toInt + if (sc.executorMemory != executorMemory) { + throw new SparkException( + "Expected executor memory to be %s, was %s".format(executorMemory, sc.executorMemory)) + } + } +} + object SimpleApplicationTest { def main(args: Array[String]) { Utils.configTestLog4j("INFO")