-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28577][YARN]Resource capability requested for each executor add offHeapMemorySize #25309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
13b81d2
ff9e2e4
cd27192
c44a33e
9421fbe
c0ef860
4fb5362
66ae9a1
c03db87
8f37ca1
40ad336
0020a02
02c0f2a
b3b5f83
bb29488
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,12 +26,12 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, P | |
| import org.apache.hadoop.yarn.util.ConverterUtils | ||
|
|
||
| import org.apache.spark.{SecurityManager, SparkConf} | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.launcher.YarnCommandBuilderUtils | ||
| import org.apache.spark.resource.ResourceID | ||
| import org.apache.spark.resource.ResourceUtils._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| object YarnSparkHadoopUtil { | ||
| object YarnSparkHadoopUtil extends Logging { | ||
|
|
||
| // Additional memory overhead | ||
| // 10% was arrived at experimentally. In the interest of minimizing memory waste while covering | ||
|
|
@@ -184,4 +184,24 @@ object YarnSparkHadoopUtil { | |
| ConverterUtils.toContainerId(containerIdString) | ||
| } | ||
|
|
||
| /** | ||
| * If MEMORY_OFFHEAP_ENABLED is true, we should ensure executorOverheadMemory requested value | ||
| * is not less than MEMORY_OFFHEAP_SIZE, otherwise the memory resource requested for executor | ||
| * may be not enough. | ||
| */ | ||
| def executorMemoryOverheadRequested(sparkConf: SparkConf): Int = { | ||
| val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt | ||
| val overhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( | ||
| math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt | ||
| val offHeap = if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) { | ||
| val size = | ||
| sparkConf.getSizeAsMb(MEMORY_OFFHEAP_SIZE.key, MEMORY_OFFHEAP_SIZE.defaultValueString) | ||
| require(size > 0, | ||
| "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true") | ||
| logInfo(s"spark.memory.offHeap.enabled is true, spark.memory.offHeap.size is $size, " + | ||
LuciferYang marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| s"overhead is $overhead, will choose the bigger as memoryOverhead.") | ||
| size | ||
| } else 0 | ||
| math.max(overhead, offHeap).toInt | ||
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,9 @@ import org.scalatest.Matchers | |
|
|
||
| import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.internal.config.UI._ | ||
| import org.apache.spark.util.{ResetSystemProperties, Utils} | ||
|
|
||
|
|
@@ -140,4 +142,69 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging | |
| } | ||
|
|
||
| } | ||
|
|
||
| test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is false, " + | ||
| "use MEMORY_OVERHEAD_MIN scene") { | ||
| val executorMemoryOverhead = | ||
| YarnSparkHadoopUtil.executorMemoryOverheadRequested(new SparkConf()) | ||
| assert(executorMemoryOverhead == MEMORY_OVERHEAD_MIN) | ||
| } | ||
|
|
||
| test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is false, " + | ||
| "use MEMORY_OVERHEAD_FACTOR * executorMemory scene") { | ||
| val executorMemory: Long = 5000 | ||
| val sparkConf = new SparkConf().set(EXECUTOR_MEMORY, executorMemory) | ||
| val executorMemoryOverhead = | ||
| YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf) | ||
| assert(executorMemoryOverhead == executorMemory * MEMORY_OVERHEAD_FACTOR) | ||
| } | ||
|
|
||
| test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is false, " + | ||
| "use EXECUTOR_MEMORY_OVERHEAD config value scene") { | ||
| val memoryOverhead: Long = 100 | ||
| val sparkConf = new SparkConf().set(EXECUTOR_MEMORY_OVERHEAD, memoryOverhead) | ||
| val executorMemoryOverhead = | ||
| YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf) | ||
| assert(executorMemoryOverhead == memoryOverhead) | ||
| } | ||
|
|
||
| test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " + | ||
| "use EXECUTOR_MEMORY_OVERHEAD config value scene") { | ||
| val memoryOverhead: Long = 100 | ||
| val offHeapMemory: Long = 50 * 1024 * 1024 | ||
| val sparkConf = new SparkConf() | ||
| .set(EXECUTOR_MEMORY_OVERHEAD, memoryOverhead) | ||
| .set(MEMORY_OFFHEAP_ENABLED, true) | ||
| .set(MEMORY_OFFHEAP_SIZE, offHeapMemory) | ||
| val executorMemoryOverhead = | ||
| YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf) | ||
| assert(executorMemoryOverhead == memoryOverhead) | ||
| } | ||
|
|
||
| test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " + | ||
| "use MEMORY_OFFHEAP_SIZE config value scene") { | ||
| val memoryOverhead: Long = 50 | ||
| val offHeapMemoryInMB = 100 | ||
| val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024 | ||
| val sparkConf = new SparkConf() | ||
| .set(EXECUTOR_MEMORY_OVERHEAD, memoryOverhead) | ||
| .set(MEMORY_OFFHEAP_ENABLED, true) | ||
| .set(MEMORY_OFFHEAP_SIZE, offHeapMemory) | ||
| val executorMemoryOverhead = | ||
| YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf) | ||
| assert(executorMemoryOverhead == offHeapMemoryInMB) | ||
| } | ||
|
|
||
| test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " + | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just wondering if we could add some yarn side UT to verify the container memory size, rather than verifying the correctness of off-heap configuration.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok ~ I'll try to add it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a new test suite |
||
| "but MEMORY_OFFHEAP_SIZE not config scene") { | ||
| val memoryOverhead: Long = 50 | ||
| val sparkConf = new SparkConf() | ||
| .set(EXECUTOR_MEMORY_OVERHEAD, memoryOverhead) | ||
| .set(MEMORY_OFFHEAP_ENABLED, true) | ||
| val expected = "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true" | ||
| val message = intercept[IllegalArgumentException] { | ||
| YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf) | ||
| }.getMessage | ||
| assert(message.contains(expected)) | ||
| } | ||
| } | ||

Uh oh!
There was an error while loading. Please reload this page.