diff --git a/docs/configuration.md b/docs/configuration.md index 93a4fccc1743..796ff11cc9ab 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -250,17 +250,17 @@ of the most common options to set are: spark.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 - Amount of non-heap memory to be allocated per executor process in cluster mode, in MiB unless + Amount of additional memory to be allocated per executor process in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). This option is currently supported on YARN and Kubernetes.
- Note: Non-heap memory includes off-heap memory - (when spark.memory.offHeap.enabled=true) and memory used by other executor processes - (e.g. python process that goes with a PySpark executor) and memory used by other non-executor - processes running in the same container. The maximum memory size of container to running executor - is determined by the sum of spark.executor.memoryOverhead and - spark.executor.memory. + Note: Additional memory includes PySpark executor memory + (when spark.executor.pyspark.memory is not configured) and memory used by other + non-executor processes running in the same container. The maximum memory size of container to + running executor is determined by the sum of spark.executor.memoryOverhead, + spark.executor.memory, spark.memory.offHeap.size and + spark.executor.pyspark.memory. @@ -1357,9 +1357,6 @@ Apart from these, the following properties are also available, and may be useful If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive. - Note: If off-heap memory is enabled, may need to raise the non-heap memory size - (e.g. increase spark.driver.memoryOverhead or - spark.executor.memoryOverhead). diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 651e706021fc..e04d2b0d1d96 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -97,6 +97,8 @@ private[spark] class Client( // Executor related configurations private val executorMemory = sparkConf.get(EXECUTOR_MEMORY) + // Executor offHeap memory in MiB. + protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf) private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt @@ -346,12 +348,14 @@ private[spark] class Client( val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() logInfo("Verifying our application has not requested more than the maximum " + s"memory capability of the cluster ($maxMem MB per container)") - val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory + val executorMem = + executorMemory + executorOffHeapMemory + executorMemoryOverhead + pysparkWorkerMemory if (executorMem > maxMem) { - throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " + - s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " + - s"the max threshold ($maxMem MB) of this cluster! Please check the values of " + - s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.") + throw new IllegalArgumentException(s"Required executor memory ($executorMemory MB), " + + s"offHeap memory ($executorOffHeapMemory) MB, overhead ($executorMemoryOverhead MB), " + + s"and PySpark memory ($pysparkWorkerMemory MB) is above the max threshold ($maxMem MB) " + + "of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' " + + "and/or 'yarn.nodemanager.resource.memory-mb'.") } val amMem = amMemory + amMemoryOverhead if (amMem > maxMem) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 8ec7bd66b250..f68be33e057b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -131,6 +131,8 @@ private[yarn] class YarnAllocator( // Executor memory in MiB. protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt + // Executor offHeap memory in MiB. + protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf) // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt @@ -149,7 +151,7 @@ private[yarn] class YarnAllocator( // Resource capability requested for each executor private[yarn] val resource: Resource = { val resource = Resource.newInstance( - executorMemory + memoryOverhead + pysparkWorkerMemory, executorCores) + executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores) ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource) logDebug(s"Created resource capability: $resource") resource diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 11035520ae18..9cefc4011c93 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -26,9 +26,8 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, P import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.internal.config._ import org.apache.spark.launcher.YarnCommandBuilderUtils -import org.apache.spark.resource.ResourceID -import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.util.Utils object YarnSparkHadoopUtil { @@ -184,4 +183,17 @@ object YarnSparkHadoopUtil { ConverterUtils.toContainerId(containerIdString) } + /** + * Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false. + */ + def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = { + if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) { + val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString) + require(sizeInMB > 0, + s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true") + sizeInMB + } else { + 0 + } + } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 4ac27ede6483..6f47a418f918 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -514,4 +514,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter verify(rmClientSpy) .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList()) } + + test("SPARK-28577#YarnAllocator.resource.memory should include offHeapSize " + + "when offHeapEnabled is true.") { + val originalOffHeapEnabled = sparkConf.get(MEMORY_OFFHEAP_ENABLED) + val originalOffHeapSize = sparkConf.get(MEMORY_OFFHEAP_SIZE) + val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt + val offHeapMemoryInMB = 1024L + val offHeapMemoryInByte = offHeapMemoryInMB * 1024 * 1024 + try { + sparkConf.set(MEMORY_OFFHEAP_ENABLED, true) + sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte) + val allocator = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString)) + val memory = allocator.resource.getMemory + assert(memory == + executorMemory + offHeapMemoryInMB + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN) + } finally { + sparkConf.set(MEMORY_OFFHEAP_ENABLED, originalOffHeapEnabled) + sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize) + } + } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index e7cde03a01b4..c88bb292aa77 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.Matchers import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -140,4 +141,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } } + + test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is false") { + val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(new SparkConf()) + assert(executorOffHeapMemory == 0) + } + + test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is true") { + val offHeapMemoryInMB = 50 + val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024 + val sparkConf = new SparkConf() + .set(MEMORY_OFFHEAP_ENABLED, true) + .set(MEMORY_OFFHEAP_SIZE, offHeapMemory) + val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf) + assert(executorOffHeapMemory == offHeapMemoryInMB) + } + + test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " + + "but MEMORY_OFFHEAP_SIZE not config scene") { + val sparkConf = new SparkConf() + .set(MEMORY_OFFHEAP_ENABLED, true) + val expected = + s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true" + val message = intercept[IllegalArgumentException] { + YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf) + }.getMessage + assert(message.contains(expected)) + } }