From 16df0d7c2fd5ee9e1562effe1c382b603ae33b43 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 15 Feb 2022 07:54:24 -0500 Subject: [PATCH 01/10] Unify memory overhead configs --- .../scala/org/apache/spark/SparkConf.scala | 4 +++- .../spark/internal/config/package.scala | 21 +++++++++++++++++++ .../org/apache/spark/deploy/k8s/Config.scala | 2 +- .../k8s/features/BasicDriverFeatureStep.scala | 8 ++++--- .../features/BasicExecutorFeatureStep.scala | 4 +++- .../cluster/mesos/MesosSchedulerUtils.scala | 9 ++++---- .../org/apache/spark/deploy/yarn/Client.scala | 9 +++++--- .../spark/deploy/yarn/YarnAllocator.scala | 5 ++++- .../deploy/yarn/YarnSparkHadoopUtil.scala | 6 ------ .../deploy/yarn/YarnAllocatorSuite.scala | 14 +++++++++++++ 10 files changed, 62 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 5f37a1abb1909..cf121749b7348 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -636,7 +636,9 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.blacklist.killBlacklistedExecutors", "3.1.0", "Please use spark.excludeOnFailure.killExcludedExecutors"), DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0", - "Please use spark.yarn.executor.launch.excludeOnFailure.enabled") + "Please use spark.yarn.executor.launch.excludeOnFailure.enabled"), + DeprecatedConfig("spark.kubernetes.memoryOverheadFactor", "3.3.0", + "Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index dbec61a1fdb76..7578e960ebeb6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -105,6 +105,17 @@ package object config { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR = + ConfigBuilder("spark.driver.memoryOverheadFactor") + .doc("The amount of non-heap memory to be allocated per driver in cluster mode, " + + "as a fraction of total driver memory. If memory overhead is specified directly, " + + "it takes precedence.") + .version("3.3.0") + .doubleConf + .checkValue(factor => factor >= 0 && factor < 1, + "Ensure that memory overhead is a double between 0 --> 1.0") + .createWithDefault(0.1) + private[spark] val DRIVER_LOG_DFS_DIR = ConfigBuilder("spark.driver.log.dfsDir").version("3.0.0").stringConf.createOptional @@ -315,6 +326,16 @@ package object config { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR = + ConfigBuilder("spark.executor.memoryOverheadFactor") + .doc("The amount of non-heap memory to be allocated per executor, as a fraction of total " + + "executor memory. If memory overhead is specified directly, it takes precedence.") + .version("3.3.0") + .doubleConf + .checkValue(factor => factor >= 0 && factor < 1, + "Ensure that memory overhead is a double between 0 --> 1.0") + .createWithDefault(0.1) + private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max") .doc("When running on a standalone deploy cluster or a Mesos cluster in coarse-grained " + "sharing mode, the maximum amount of CPU cores to request for the application from across " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 58a4a785b5182..9a61950469d71 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -506,7 +506,7 @@ private[spark] object Config extends Logging { .doubleConf .checkValue(mem_overhead => mem_overhead >= 0, "Ensure that memory overhead is non-negative") - .createWithDefault(0.1) + .createOptional val PYSPARK_MAJOR_PYTHON_VERSION = ConfigBuilder("spark.kubernetes.pyspark.pythonVersion") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index f2104d433ad49..57202460b2492 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -53,18 +53,20 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) // Memory settings private val driverMemoryMiB = conf.get(DRIVER_MEMORY) + private val memoryOverheadFactor = conf.get(MEMORY_OVERHEAD_FACTOR) + .getOrElse(conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)) // The memory overhead factor to use. If the user has not set it, then use a different // value for non-JVM apps. This value is propagated to executors. private val overheadFactor = if (conf.mainAppResource.isInstanceOf[NonJVMResource]) { - if (conf.contains(MEMORY_OVERHEAD_FACTOR)) { - conf.get(MEMORY_OVERHEAD_FACTOR) + if (conf.contains(MEMORY_OVERHEAD_FACTOR) || conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) { + memoryOverheadFactor } else { NON_JVM_MEMORY_OVERHEAD_FACTOR } } else { - conf.get(MEMORY_OVERHEAD_FACTOR) + memoryOverheadFactor } private val memoryOverheadMiB = conf diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index c6084720c56fe..eec4967b8a9d5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -59,11 +59,13 @@ private[spark] class BasicExecutorFeatureStep( private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON) private val disableConfigMap = kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP) + private val memoryOverheadFactor = kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) + .getOrElse(kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) val execResources = ResourceProfile.getResourcesForClusterManager( resourceProfile.id, resourceProfile.executorResources, - kubernetesConf.get(MEMORY_OVERHEAD_FACTOR), + memoryOverheadFactor, kubernetesConf.sparkConf, isPythonApp, Map.empty) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 38f83df00e428..524b1d514fafe 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -387,8 +387,7 @@ trait MesosSchedulerUtils extends Logging { } } - // These defaults copied from YARN - private val MEMORY_OVERHEAD_FRACTION = 0.10 + // This default copied from YARN private val MEMORY_OVERHEAD_MINIMUM = 384 /** @@ -400,8 +399,9 @@ trait MesosSchedulerUtils extends Logging { * (whichever is larger) */ def executorMemory(sc: SparkContext): Int = { + val memoryOverheadFactor = sc.conf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) sc.conf.get(mesosConfig.EXECUTOR_MEMORY_OVERHEAD).getOrElse( - math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + + math.max(memoryOverheadFactor * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + sc.executorMemory } @@ -415,7 +415,8 @@ trait MesosSchedulerUtils extends Logging { * `MEMORY_OVERHEAD_FRACTION (=0.1) * driverMemory` */ def driverContainerMemory(driverDesc: MesosDriverDescription): Int = { - val defaultMem = math.max(MEMORY_OVERHEAD_FRACTION * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM) + val memoryOverheadFactor = driverDesc.conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) + val defaultMem = math.max(memoryOverheadFactor * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM) driverDesc.conf.get(mesosConfig.DRIVER_MEMORY_OVERHEAD).getOrElse(defaultMem.toInt) + driverDesc.mem } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ae85ea8d6110a..ef71b1ce8abc9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -70,7 +70,6 @@ private[spark] class Client( extends Logging { import Client._ - import YarnSparkHadoopUtil._ private val yarnClient = YarnClient.createYarnClient private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) @@ -85,6 +84,8 @@ private[spark] class Client( private var appMaster: ApplicationMaster = _ private var stagingDirPath: Path = _ + private val amMemoryOverheadFactor = sparkConf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) + // AM related configurations private val amMemory = if (isClusterMode) { sparkConf.get(DRIVER_MEMORY).toInt @@ -94,7 +95,7 @@ private[spark] class Client( private val amMemoryOverhead = { val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD sparkConf.get(amMemoryOverheadEntry).getOrElse( - math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, + math.max((amMemoryOverheadFactor * amMemory).toLong, ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt } private val amCores = if (isClusterMode) { @@ -107,8 +108,10 @@ private[spark] class Client( private val executorMemory = sparkConf.get(EXECUTOR_MEMORY) // Executor offHeap memory in MiB. protected val executorOffHeapMemory = Utils.executorOffHeapMemorySizeAsMb(sparkConf) + + private val executorMemoryOvereadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, + math.max((executorMemoryOvereadFactor * executorMemory).toLong, ResourceProfile.MEMORY_OVERHEAD_MIN_MIB)).toInt private val isPython = sparkConf.get(IS_PYTHON_APP) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 54ab643f2755b..d922d2c028818 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -280,9 +280,12 @@ private[yarn] class YarnAllocator( // track the resource profile if not already there getOrUpdateRunningExecutorForRPId(rp.id) logInfo(s"Resource profile ${rp.id} doesn't exist, adding it") + + val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) + val resourcesWithDefaults = ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources, - MEMORY_OVERHEAD_FACTOR, sparkConf, isPythonApp, resourceNameMapping) + memoryOverheadFactor, sparkConf, isPythonApp, resourceNameMapping) val customSparkResources = resourcesWithDefaults.customResources.map { case (name, execReq) => (name, execReq.amount.toString) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index f347e37ba24ab..a7d6301f48804 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -34,12 +34,6 @@ import org.apache.spark.util.Utils object YarnSparkHadoopUtil { - // Additional memory overhead - // 10% was arrived at experimentally. In the interest of minimizing memory waste while covering - // the common cases. Memory overhead tends to grow with container size. - - val MEMORY_OVERHEAD_FACTOR = 0.10 - val ANY_HOST = "*" // All RM requests are issued with same priority : we do not (yet) have any distinction between diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index db65d128b07f0..089a1ffb86972 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -706,4 +706,18 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize) } } + + test("SPARK-38194: Configurable memory overhead factor") { + val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt + try { + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.5) + val (handler, _) = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString)) + val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) + val memory = defaultResource.getMemory + assert(memory == (executorMemory * 1.5).toInt) + } finally { + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1) + } + } } From 2514140a0b94e5583b5cb023da4fe6d8906c3200 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 15 Feb 2022 08:20:13 -0500 Subject: [PATCH 02/10] Update k8s driver tests --- .../deploy/k8s/features/BasicDriverFeatureStep.scala | 2 +- .../k8s/features/BasicDriverFeatureStepSuite.scala | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 57202460b2492..4e57e977dd9d5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -165,7 +165,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) KUBERNETES_DRIVER_POD_NAME.key -> driverPodName, "spark.app.id" -> conf.appId, KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", - MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) + DRIVER_MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) // try upload local, resolvable files to a hadoop compatible file system Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key => val uris = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri)) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 83444e5518e32..9bd0429c2af94 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -129,7 +129,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", "spark.app.id" -> KubernetesTestConf.APP_ID, "spark.kubernetes.submitInDriver" -> "true", - MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) + DRIVER_MEMORY_OVERHEAD_FACTOR.key -> DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } @@ -188,7 +188,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { // Memory overhead tests. Tuples are: // test name, main resource, overhead factor, expected factor Seq( - ("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get), + ("java", JavaMainAppResource(None), None, DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get), ("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR), ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d), ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR) @@ -196,13 +196,13 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { test(s"memory overhead factor: $name") { // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB val driverMem = - ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 // main app resource, overhead factor val sparkConf = new SparkConf(false) .set(CONTAINER_IMAGE, "spark-driver:latest") .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") - factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) } + factor.foreach { value => sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, value) } val conf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf, mainAppResource = resource) @@ -213,7 +213,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(mem === s"${expected}Mi") val systemProperties = step.getAdditionalPodSystemProperties() - assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) } } From fbfee7be63e5fd94312ba0b86a346381053ce154 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 15 Feb 2022 20:55:24 -0500 Subject: [PATCH 03/10] Fix k8s test --- .../scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index eecaff262bf66..d9a736d8abc8a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -97,7 +97,7 @@ class KubernetesConfSuite extends SparkFunSuite { assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) assert(conf.environment === CUSTOM_ENVS) - assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3) + assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR).get === 0.3) } test("Basic executor translated fields.") { From d241a9f929959c04283057884f8c9ef6abe85809 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 16 Feb 2022 08:12:01 -0500 Subject: [PATCH 04/10] Update mesos rest server and test --- .../apache/spark/deploy/rest/mesos/MesosRestServer.scala | 5 ++++- .../spark/deploy/rest/mesos/MesosRestServerSuite.scala | 8 +++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index 2fd13a5903243..9e4187837b680 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -105,6 +105,7 @@ private[mesos] class MesosSubmitRequestServlet( val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key) val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key) val driverMemoryOverhead = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD.key) + val driverMemoryOverheadFactor = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD_FACTOR.key) val driverCores = sparkProperties.get(config.DRIVER_CORES.key) val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) @@ -121,8 +122,10 @@ private[mesos] class MesosSubmitRequestServlet( mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) + val actualDriverMemoryFactor = driverMemoryOverheadFactor.map(_.toDouble).getOrElse( + MEMORY_OVERHEAD_FACTOR) val actualDriverMemoryOverhead = driverMemoryOverhead.map(_.toInt).getOrElse( - math.max((MEMORY_OVERHEAD_FACTOR * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN)) + math.max((actualDriverMemoryFactor * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN)) val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES) val submitDate = new Date() val submissionId = newDriverId(submitDate) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala index 344fc38c84fb1..8bed43a54d5d0 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala @@ -35,10 +35,16 @@ class MesosRestServerSuite extends SparkFunSuite testOverheadMemory(new SparkConf(), "2000M", 2384) } - test("test driver overhead memory with overhead factor") { + test("test driver overhead memory with default overhead factor") { testOverheadMemory(new SparkConf(), "5000M", 5500) } + test("test driver overhead memory with overhead factor") { + val conf = new SparkConf() + conf.set(config.DRIVER_MEMORY_OVERHEAD_FACTOR.key, "0.2") + testOverheadMemory(conf, "5000M", 6000) + } + test("test configured driver overhead memory") { val conf = new SparkConf() conf.set(config.DRIVER_MEMORY_OVERHEAD.key, "1000") From f65d0e4a25d8a3e6827ad174438f99565bc850f2 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 17 Feb 2022 07:33:10 -0500 Subject: [PATCH 05/10] Update k8s setting order, update docs, and add yarn AM client mode overhead factor setting --- .../spark/internal/config/package.scala | 19 ++++++++++--------- .../org/apache/spark/deploy/k8s/Config.scala | 2 +- .../k8s/features/BasicDriverFeatureStep.scala | 7 +++++-- .../features/BasicExecutorFeatureStep.scala | 7 +++++-- .../deploy/k8s/KubernetesConfSuite.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 6 +++++- .../org/apache/spark/deploy/yarn/config.scala | 5 +++++ 7 files changed, 32 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7578e960ebeb6..db2d2f4a8c185 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -107,13 +107,13 @@ package object config { private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.driver.memoryOverheadFactor") - .doc("The amount of non-heap memory to be allocated per driver in cluster mode, " + - "as a fraction of total driver memory. If memory overhead is specified directly, " + - "it takes precedence.") + .doc("This sets the Memory Overhead Factor on the driver that will allocate memory to " + + "non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various " + + "systems processes, and tmpfs-based local directories.") .version("3.3.0") .doubleConf - .checkValue(factor => factor >= 0 && factor < 1, - "Ensure that memory overhead is a double between 0 --> 1.0") + .checkValue(factor => factor > 0, + "Ensure that memory overhead is a double greater than 0") .createWithDefault(0.1) private[spark] val DRIVER_LOG_DFS_DIR = @@ -328,12 +328,13 @@ package object config { private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.executor.memoryOverheadFactor") - .doc("The amount of non-heap memory to be allocated per executor, as a fraction of total " + - "executor memory. If memory overhead is specified directly, it takes precedence.") + .doc("This sets the Memory Overhead Factor on executors that will allocate memory to " + + "non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various " + + "systems processes, and tmpfs-based local directories.") .version("3.3.0") .doubleConf - .checkValue(factor => factor >= 0 && factor < 1, - "Ensure that memory overhead is a double between 0 --> 1.0") + .checkValue(factor => factor > 0, + "Ensure that memory overhead is a double greater than 0") .createWithDefault(0.1) private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 9a61950469d71..58a4a785b5182 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -506,7 +506,7 @@ private[spark] object Config extends Logging { .doubleConf .checkValue(mem_overhead => mem_overhead >= 0, "Ensure that memory overhead is non-negative") - .createOptional + .createWithDefault(0.1) val PYSPARK_MAJOR_PYTHON_VERSION = ConfigBuilder("spark.kubernetes.pyspark.pythonVersion") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 4e57e977dd9d5..e490bea6382e2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -53,8 +53,11 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) // Memory settings private val driverMemoryMiB = conf.get(DRIVER_MEMORY) - private val memoryOverheadFactor = conf.get(MEMORY_OVERHEAD_FACTOR) - .getOrElse(conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR)) + private val memoryOverheadFactor = if (conf.contains(DRIVER_MEMORY_OVERHEAD_FACTOR)) { + conf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) + } else { + conf.get(MEMORY_OVERHEAD_FACTOR) + } // The memory overhead factor to use. If the user has not set it, then use a different // value for non-JVM apps. This value is propagated to executors. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index eec4967b8a9d5..b0e5298d6f39d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -59,8 +59,11 @@ private[spark] class BasicExecutorFeatureStep( private val isDefaultProfile = resourceProfile.id == ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID private val isPythonApp = kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON) private val disableConfigMap = kubernetesConf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP) - private val memoryOverheadFactor = kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) - .getOrElse(kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) + private val memoryOverheadFactor = if (kubernetesConf.contains(EXECUTOR_MEMORY_OVERHEAD_FACTOR)) { + kubernetesConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) + } else { + kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) + } val execResources = ResourceProfile.getResourcesForClusterManager( resourceProfile.id, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index d9a736d8abc8a..eecaff262bf66 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -97,7 +97,7 @@ class KubernetesConfSuite extends SparkFunSuite { assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS) assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS) assert(conf.environment === CUSTOM_ENVS) - assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR).get === 0.3) + assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3) } test("Basic executor translated fields.") { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ef71b1ce8abc9..b53f2499be5b3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -84,7 +84,11 @@ private[spark] class Client( private var appMaster: ApplicationMaster = _ private var stagingDirPath: Path = _ - private val amMemoryOverheadFactor = sparkConf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) + private val amMemoryOverheadFactor = if (isClusterMode) { + sparkConf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) + } else { + sparkConf.get(AM_MEMORY_OVERHEAD_FACTOR) + } // AM related configurations private val amMemory = if (isClusterMode) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 95d1cd150617d..02de88157ac4c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -320,6 +320,11 @@ package object config extends Logging { .bytesConf(ByteUnit.MiB) .createOptional + private[spark] val AM_MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.yarn.am.memoryOverheadFactor") + .version("3.3.0") + .doubleConf + .createWithDefault(0.1) + private[spark] val AM_MEMORY = ConfigBuilder("spark.yarn.am.memory") .version("1.3.0") .bytesConf(ByteUnit.MiB) From 740e97088c3fcc3bdcbe211291d7143d768c2596 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sat, 19 Feb 2022 09:08:06 -0500 Subject: [PATCH 06/10] Remove specific yarn AM setting --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 3 ++- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 5 +++++ .../src/main/scala/org/apache/spark/deploy/yarn/config.scala | 5 ----- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b53f2499be5b3..f364b79216098 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -54,6 +54,7 @@ import org.apache.spark.api.python.PythonUtils import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil} import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.ResourceRequestHelper._ +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -87,7 +88,7 @@ private[spark] class Client( private val amMemoryOverheadFactor = if (isClusterMode) { sparkConf.get(DRIVER_MEMORY_OVERHEAD_FACTOR) } else { - sparkConf.get(AM_MEMORY_OVERHEAD_FACTOR) + AM_MEMORY_OVERHEAD_FACTOR } // AM related configurations diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index a7d6301f48804..1869c739e4844 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -34,6 +34,11 @@ import org.apache.spark.util.Utils object YarnSparkHadoopUtil { + // Additional memory overhead for application masters in client mode. + // 10% was arrived at experimentally. In the interest of minimizing memory waste while covering + // the common cases. Memory overhead tends to grow with container size. + val AM_MEMORY_OVERHEAD_FACTOR = 0.10 + val ANY_HOST = "*" // All RM requests are issued with same priority : we do not (yet) have any distinction between diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 02de88157ac4c..95d1cd150617d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -320,11 +320,6 @@ package object config extends Logging { .bytesConf(ByteUnit.MiB) .createOptional - private[spark] val AM_MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.yarn.am.memoryOverheadFactor") - .version("3.3.0") - .doubleConf - .createWithDefault(0.1) - private[spark] val AM_MEMORY = ConfigBuilder("spark.yarn.am.memory") .version("1.3.0") .bytesConf(ByteUnit.MiB) From 9e360559ad1143cd2d3bbd1909ee1a826f26885a Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Sun, 20 Feb 2022 10:44:17 -0500 Subject: [PATCH 07/10] Add new configs to docs --- docs/configuration.md | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index ae3f422f34b3a..7e5c9101db8a2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -183,7 +183,7 @@ of the most common options to set are: spark.driver.memoryOverhead - driverMemory * 0.10, with minimum of 384 + driverMemory * spark.driver.memoryOverheadFactor, with minimum of 384 Amount of non-heap memory to be allocated per driver process in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, @@ -198,6 +198,16 @@ of the most common options to set are: 2.3.0 + + spark.driver.memoryOverheadFactor + 0.10 + + Fraction of driver memory to be allocated as additional non-heap memory per driver process + in cluster mode. This value is ignored if spark.driver.memoryOverhead is set + directly. + + 3.3.0 + spark.driver.resource.{resourceName}.amount 0 @@ -272,7 +282,7 @@ of the most common options to set are: spark.executor.memoryOverhead - executorMemory * 0.10, with minimum of 384 + executorMemory * spark.executor.memoryOverheadFactor, with minimum of 384 Amount of additional memory to be allocated per executor process, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. @@ -287,6 +297,15 @@ of the most common options to set are: 2.3.0 + + spark.executor.memoryOverheadFactor + 0.10 + + Fraction of executor memory to be allocated as additional non-heap memory per executor process. + This value is ignored if spark.executor.memoryOverhead is set directly. + + 3.3.0 + spark.executor.resource.{resourceName}.amount 0 From 4f8e7e6589d90d23c252278d1365b069fc9634fd Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 1 Mar 2022 20:30:09 -0500 Subject: [PATCH 08/10] Update config docs --- .../org/apache/spark/internal/config/package.scala | 14 ++++++++------ docs/configuration.md | 9 ++++++--- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index db2d2f4a8c185..9ca4ee47e7372 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -107,9 +107,10 @@ package object config { private[spark] val DRIVER_MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.driver.memoryOverheadFactor") - .doc("This sets the Memory Overhead Factor on the driver that will allocate memory to " + - "non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various " + - "systems processes, and tmpfs-based local directories.") + .doc("Fraction of driver memory to be allocated as additional non-heap memory per driver " + + "process in cluster mode. This is memory that accounts for things like VM overheads, " + + "interned strings, other native overheads, etc. This tends to grow with the container " + + "size. This value is ignored if spark.driver.memoryOverhead is set directly.") .version("3.3.0") .doubleConf .checkValue(factor => factor > 0, @@ -328,9 +329,10 @@ package object config { private[spark] val EXECUTOR_MEMORY_OVERHEAD_FACTOR = ConfigBuilder("spark.executor.memoryOverheadFactor") - .doc("This sets the Memory Overhead Factor on executors that will allocate memory to " + - "non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various " + - "systems processes, and tmpfs-based local directories.") + .doc("Fraction of executor memory to be allocated as additional non-heap memory per " + + "executor process. This is memory that accounts for things like VM overheads, " + + "interned strings, other native overheads, etc. This tends to grow with the container " + + "size. This value is ignored if spark.executor.memoryOverhead is set directly.") .version("3.3.0") .doubleConf .checkValue(factor => factor > 0, diff --git a/docs/configuration.md b/docs/configuration.md index 7e5c9101db8a2..a963e1a1f4170 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -202,9 +202,10 @@ of the most common options to set are: spark.driver.memoryOverheadFactor 0.10 - Fraction of driver memory to be allocated as additional non-heap memory per driver process - in cluster mode. This value is ignored if spark.driver.memoryOverhead is set - directly. + Fraction of driver memory to be allocated as additional non-heap memory per driver process in cluster mode. + This is memory that accounts for things like VM overheads, interned strings, + other native overheads, etc. This tends to grow with the container size. + This value is ignored if spark.driver.memoryOverhead is set directly. 3.3.0 @@ -302,6 +303,8 @@ of the most common options to set are: 0.10 Fraction of executor memory to be allocated as additional non-heap memory per executor process. + This is memory that accounts for things like VM overheads, interned strings, + other native overheads, etc. This tends to grow with the container size. This value is ignored if spark.executor.memoryOverhead is set directly. 3.3.0 From bcb764c71836e2240aeee736cf3085e302d94982 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 2 Mar 2022 08:21:56 -0500 Subject: [PATCH 09/10] Move yarn allocator memory factor to class level and add tests for setting precedence --- .../BasicDriverFeatureStepSuite.scala | 52 ++++++++++++++++++ .../BasicExecutorFeatureStepSuite.scala | 53 +++++++++++++++++++ .../spark/deploy/yarn/YarnAllocator.scala | 4 +- .../deploy/yarn/YarnAllocatorSuite.scala | 19 ++++++- 4 files changed, 124 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 9bd0429c2af94..9a6f3474eccb4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -217,6 +217,58 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { } } + test(s"SPARK-38194: memory overhead factor precendence") { + // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val driverMem = + ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") + + // New config should take precedence + sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, 0.2) + sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) + val expectedFactor = 0.2 + + val conf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf) + val step = new BasicDriverFeatureStep(conf) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) + val expected = (driverMem + driverMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === "0.2") + } + + test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") { + // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val driverMem = + ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / DRIVER_MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") + + // Old config still works if new config isn't given + sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) + + val conf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf) + val step = new BasicDriverFeatureStep(conf) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) + val expected = (driverMem + driverMem * 0.3).toInt + assert(mem === s"${expected}Mi") + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === "0.3") + } + test("SPARK-35493: make spark.blockManager.port be able to be fallen back to in driver pod") { val initPod = SparkPod.initialPod() val sparkConf = new SparkConf() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index f5f2712481604..334b5c2829714 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -441,6 +441,59 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { )) } + test(s"SPARK-38194: memory overhead factor precendence") { + // Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get + val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m") + + // New config should take precedence + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.2) + sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) + val expectedFactor = 0.2 + + val conf = KubernetesTestConf.createExecutorConf( + sparkConf = sparkConf) + ResourceProfile.clearDefaultProfile() + val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf), + resourceProfile) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) + val expected = (executorMem + executorMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + } + + test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") { + // Choose an executor memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val defaultFactor = EXECUTOR_MEMORY_OVERHEAD_FACTOR.defaultValue.get + val executorMem = ResourceProfile.MEMORY_OVERHEAD_MIN_MIB / defaultFactor * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m") + + // New config should take precedence + sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) + + val conf = KubernetesTestConf.createExecutorConf( + sparkConf = sparkConf) + ResourceProfile.clearDefaultProfile() + val resourceProfile = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val step = new BasicExecutorFeatureStep(conf, new SecurityManager(baseConf), + resourceProfile) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) + val expected = (executorMem + executorMem * 0.3).toInt + assert(mem === s"${expected}Mi") + } + + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index d922d2c028818..a85b7174673af 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -163,6 +163,8 @@ private[yarn] class YarnAllocator( private val isPythonApp = sparkConf.get(IS_PYTHON_APP) + private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) + private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS)) @@ -281,8 +283,6 @@ private[yarn] class YarnAllocator( getOrUpdateRunningExecutorForRPId(rp.id) logInfo(s"Resource profile ${rp.id} doesn't exist, adding it") - val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) - val resourcesWithDefaults = ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources, memoryOverheadFactor, sparkConf, isPythonApp, resourceNameMapping) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 089a1ffb86972..ae010f11503dd 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -708,14 +708,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } test("SPARK-38194: Configurable memory overhead factor") { - val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt + val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toLong + try { + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.5) + val (handler, _) = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString)) + val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) + val memory = defaultResource.getMemory + assert(memory == (executorMemory * 1.5).toLong) + } finally { + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1) + } + } + + test("SPARK-38194: Memory overhead takes precedence over factor") { + val executorMemory = sparkConf.get(EXECUTOR_MEMORY) try { sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.5) + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD, (executorMemory * 0.4).toLong) val (handler, _) = createAllocator(maxExecutors = 1, additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString)) val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) val memory = defaultResource.getMemory - assert(memory == (executorMemory * 1.5).toInt) + assert(memory == (executorMemory * 1.4).toLong) } finally { sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.1) } From 0bf3a2f1187907a7edecbbbf61aa0c936b77bcd8 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 2 Mar 2022 18:50:58 -0500 Subject: [PATCH 10/10] Update docs for configs and dedupe test literals --- .../org/apache/spark/internal/config/package.scala | 6 +++++- docs/configuration.md | 4 ++++ docs/running-on-kubernetes.md | 9 --------- .../k8s/features/BasicDriverFeatureStepSuite.scala | 13 +++++++------ .../features/BasicExecutorFeatureStepSuite.scala | 9 +++++---- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9ca4ee47e7372..ffe4501248f43 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -110,7 +110,11 @@ package object config { .doc("Fraction of driver memory to be allocated as additional non-heap memory per driver " + "process in cluster mode. This is memory that accounts for things like VM overheads, " + "interned strings, other native overheads, etc. This tends to grow with the container " + - "size. This value is ignored if spark.driver.memoryOverhead is set directly.") + "size. This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to " + + "0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks " + + "commonly fail with \"Memory Overhead Exceeded\" errors. This preempts this error " + + "with a higher default. This value is ignored if spark.driver.memoryOverhead is set " + + "directly.") .version("3.3.0") .doubleConf .checkValue(factor => factor > 0, diff --git a/docs/configuration.md b/docs/configuration.md index a963e1a1f4170..a2e6797b55e2f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -205,6 +205,10 @@ of the most common options to set are: Fraction of driver memory to be allocated as additional non-heap memory per driver process in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size. + This value defaults to 0.10 except for Kubernetes non-JVM jobs, which defaults to + 0.40. This is done as non-JVM tasks need more non-JVM heap space and such tasks + commonly fail with "Memory Overhead Exceeded" errors. This preempts this error + with a higher default. This value is ignored if spark.driver.memoryOverhead is set directly. 3.3.0 diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 8553d7886acf0..cf4aef5a84c38 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1137,15 +1137,6 @@ See the [configuration page](configuration.html) for information on Spark config 3.0.0 - - spark.kubernetes.memoryOverheadFactor - 0.1 - - This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various systems processes, and tmpfs-based local directories when spark.kubernetes.local.dirs.tmpfs is true. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs. - This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This preempts this error with a higher default. - - 2.4.0 - spark.kubernetes.pyspark.pythonVersion "3" diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 9a6f3474eccb4..4c95041d4f139 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -228,9 +228,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") // New config should take precedence - sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, 0.2) - sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) val expectedFactor = 0.2 + sparkConf.set(DRIVER_MEMORY_OVERHEAD_FACTOR, expectedFactor) + sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) val conf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf) @@ -241,7 +241,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(mem === s"${expected}Mi") val systemProperties = step.getAdditionalPodSystemProperties() - assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === "0.2") + assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) } test(s"SPARK-38194: old memory factor settings is applied if new one isn't given") { @@ -255,18 +255,19 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") // Old config still works if new config isn't given - sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) + val expectedFactor = 0.3 + sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor) val conf = KubernetesTestConf.createDriverConf( sparkConf = sparkConf) val step = new BasicDriverFeatureStep(conf) val pod = step.configurePod(SparkPod.initialPod()) val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) - val expected = (driverMem + driverMem * 0.3).toInt + val expected = (driverMem + driverMem * expectedFactor).toInt assert(mem === s"${expected}Mi") val systemProperties = step.getAdditionalPodSystemProperties() - assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === "0.3") + assert(systemProperties(DRIVER_MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) } test("SPARK-35493: make spark.blockManager.port be able to be fallen back to in driver pod") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 334b5c2829714..731a9b77d2059 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -452,9 +452,9 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m") // New config should take precedence - sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, 0.2) - sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) val expectedFactor = 0.2 + sparkConf.set(EXECUTOR_MEMORY_OVERHEAD_FACTOR, expectedFactor) + sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) val conf = KubernetesTestConf.createExecutorConf( sparkConf = sparkConf) @@ -479,7 +479,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .set(EXECUTOR_MEMORY.key, s"${executorMem.toInt}m") // New config should take precedence - sparkConf.set(MEMORY_OVERHEAD_FACTOR, 0.3) + val expectedFactor = 0.3 + sparkConf.set(MEMORY_OVERHEAD_FACTOR, expectedFactor) val conf = KubernetesTestConf.createExecutorConf( sparkConf = sparkConf) @@ -489,7 +490,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { resourceProfile) val pod = step.configurePod(SparkPod.initialPod()) val mem = amountAndFormat(pod.container.getResources.getRequests.get("memory")) - val expected = (executorMem + executorMem * 0.3).toInt + val expected = (executorMem + executorMem * expectedFactor).toInt assert(mem === s"${expected}Mi") }