Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ private[spark] class Client(

// Executor related configurations
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
// Executor offHeap memory in MiB.
protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt

Expand Down Expand Up @@ -346,12 +348,14 @@ private[spark] class Client(
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo("Verifying our application has not requested more than the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
val executorMem =
executorMemory + executorOffHeapMemory +executorMemoryOverhead + pysparkWorkerMemory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

white space after +.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if (executorMem > maxMem) {
throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
throw new IllegalArgumentException(s"Required executor memory ($executorMemory MB), " +
s"offHeap memory ($executorOffHeapMemory) MB, overhead ($executorMemoryOverhead MB), " +
s"and PySpark memory ($pysparkWorkerMemory MB) is above the max threshold ($maxMem MB) " +
s"of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' " +
s"and/or 'yarn.nodemanager.resource.memory-mb'.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add string interpolation for this line and above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

02c0f2a fix this

}
val amMem = amMemory + amMemoryOverhead
if (amMem > maxMem) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ private[yarn] class YarnAllocator(

// Executor memory in MiB.
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Executor offHeap memory in MiB.
protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
Expand All @@ -149,7 +151,7 @@ private[yarn] class YarnAllocator(
// Resource capability requested for each executor
private[yarn] val resource: Resource = {
val resource = Resource.newInstance(
executorMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
Copy link
Contributor

@beliefer beliefer Aug 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According line 258 to 260 in docs/configuration.md, memoryOverhead includes pysparkWorkerMemory, but looks difference here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Described in the document is Additional memory includes PySpark executor memory (when <code>spark.executor.pyspark.memory</code> is not configured), I think we need a new jira to discuss how to solve this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this makes me confused.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this what spark.executor.memoryOverhead already adds via memoryOverhead?
I guess I'm wondering what MEMORY_OFFHEAP_SIZE does that's supposed to be different.

Copy link
Contributor Author

@LuciferYang LuciferYang Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen memoryOverhead include MEMORY_OFFHEAP_SIZE before this pr, and memoryOverhead and MEMORY_OFFHEAP_SIZE had to be modified at the same time to ensure request enough resources from yarn if I want to increase MEMORY_OFFHEAP_SIZE , this is not user friendly, and this has always been confusing, why we need to modify two memory-related parameters simultaneously for one purpose? This pr let them be independent.

ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
logDebug(s"Created resource capability: $resource")
resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, P
import org.apache.hadoop.yarn.util.ConverterUtils

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.resource.ResourceID
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.util.Utils

object YarnSparkHadoopUtil {
Expand Down Expand Up @@ -184,4 +183,18 @@ object YarnSparkHadoopUtil {
ConverterUtils.toContainerId(containerIdString)
}

/**
* Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
*/
def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
val sizeInMB =
sparkConf.getSizeAsMb(MEMORY_OFFHEAP_SIZE.key, MEMORY_OFFHEAP_SIZE.defaultValueString).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personally not a fan of this way, can we just do a. conf.get(MEMORY_OFFHEAP_SIZE) and then call the byteStringAsMb. That way we aren't explicitly having to get key and default value and we are bypassing the ConfigBuilder stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8f37ca1 change to use Utils.byteStringAsMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString).toInt to conversion config value to sizeInMB.

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs Refer to api comments,
We have to use s"${sparkConf.get(MEMORY_OFFHEAP_SIZE)}B" as input to call Utils.byteStringAsMb(str: String) because byte string should 50b, 100k, or 250m,looks a little strange, It feels like the old version looks more comfortable.

require(sizeInMB > 0,
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if MEMORY_OFFHEAP_SIZE could equal to 0. The definition of MEMORY_OFFHEAP_SIZE checks that it could be >= 0.

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little conflict with the MemoryManager as following:
image

if MEMORY_OFFHEAP_ENABLED is enable, MemoryManager.tungstenMemoryMode will enter OFF_HEAP branch and need MEMORY_OFFHEAP_SIZE > 0 and I think we should be consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I think we should change the code here.

.checkValue(_ >= 0, "The off-heap memory size must not be negative")

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 is defaultValue, change to

.checkValue(_ > 0, "The off-heap memory size must be positive")
.createWithDefault(1)

?
otherwise will throw IllegalArgumentException when offHeapEnabled is false and defaultValue is 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should give a suitable defaultValue ,like 1073741824(1g)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao Do we need to change 0 to a suitable defaultValue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then I would suggest not to change it. Seems there's no good value which could cover most of the scenarios, so good to leave as it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its odd that check is >= 0 in the config, seems like we should change but can you file a separate jira for that?

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK~ I will add a new jira to discuss this issue.

sizeInMB
} else {
0
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.Matchers
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.util.{ResetSystemProperties, Utils}

Expand Down Expand Up @@ -140,4 +141,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}

}

test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is false") {
val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(new SparkConf())
assert(executorOffHeapMemory == 0)
}

test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is true") {
val offHeapMemoryInMB = 50
val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024
val sparkConf = new SparkConf()
.set(MEMORY_OFFHEAP_ENABLED, true)
.set(MEMORY_OFFHEAP_SIZE, offHeapMemory)
val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
assert(executorOffHeapMemory == offHeapMemoryInMB)
}

test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if we could add some yarn side UT to verify the container memory size, rather than verifying the correctness of off-heap configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok ~ I'll try to add it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new test suite SPARK-28577#YarnAllocator.resource.memory should include offHeapSize when offHeapEnabled is true. in YarnAllocatorSuite

"but MEMORY_OFFHEAP_SIZE not config scene") {
val sparkConf = new SparkConf()
.set(MEMORY_OFFHEAP_ENABLED, true)
val expected =
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true"
val message = intercept[IllegalArgumentException] {
YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
}.getMessage
assert(message.contains(expected))
}
}