-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28577][YARN]Resource capability requested for each executor add offHeapMemorySize #25309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
13b81d2
ff9e2e4
cd27192
c44a33e
9421fbe
c0ef860
4fb5362
66ae9a1
c03db87
8f37ca1
40ad336
0020a02
02c0f2a
b3b5f83
bb29488
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,6 +131,8 @@ private[yarn] class YarnAllocator( | |
|
|
||
| // Executor memory in MiB. | ||
| protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt | ||
| // Executor offHeap memory in MiB. | ||
| protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf) | ||
| // Additional memory overhead. | ||
| protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( | ||
| math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt | ||
|
|
@@ -149,7 +151,7 @@ private[yarn] class YarnAllocator( | |
| // Resource capability requested for each executor | ||
| private[yarn] val resource: Resource = { | ||
| val resource = Resource.newInstance( | ||
| executorMemory + memoryOverhead + pysparkWorkerMemory, executorCores) | ||
| executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According line 258 to 260 in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Described in the document is
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this makes me confused.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this what
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @srowen |
||
| ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource) | ||
| logDebug(s"Created resource capability: $resource") | ||
| resource | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -26,9 +26,8 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, P | |||
| import org.apache.hadoop.yarn.util.ConverterUtils | ||||
|
|
||||
| import org.apache.spark.{SecurityManager, SparkConf} | ||||
| import org.apache.spark.internal.config._ | ||||
| import org.apache.spark.launcher.YarnCommandBuilderUtils | ||||
| import org.apache.spark.resource.ResourceID | ||||
| import org.apache.spark.resource.ResourceUtils._ | ||||
| import org.apache.spark.util.Utils | ||||
|
|
||||
| object YarnSparkHadoopUtil { | ||||
|
|
@@ -184,4 +183,17 @@ object YarnSparkHadoopUtil { | |||
| ConverterUtils.toContainerId(containerIdString) | ||||
| } | ||||
|
|
||||
| /** | ||||
| * Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false. | ||||
| */ | ||||
| def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = { | ||||
| if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) { | ||||
| val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString) | ||||
| require(sizeInMB > 0, | ||||
| s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true") | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please check if
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then I think we should change the code here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 0 is defaultValue, change to ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should give a suitable defaultValue ,like 1073741824(1g)?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jerryshao Do we need to change 0 to a suitable defaultValue?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, then I would suggest not to change it. Seems there's no good value which could cover most of the scenarios, so good to leave as it is.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok~
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. its odd that check is >= 0 in the config, seems like we should change but can you file a separate jira for that?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK~ I will add a new jira to discuss this issue. |
||||
| sizeInMB | ||||
| } else { | ||||
| 0 | ||||
| } | ||||
| } | ||||
| } | ||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ import org.scalatest.Matchers | |
| import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.internal.config.UI._ | ||
| import org.apache.spark.util.{ResetSystemProperties, Utils} | ||
|
|
||
|
|
@@ -140,4 +141,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging | |
| } | ||
|
|
||
| } | ||
|
|
||
| test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is false") { | ||
| val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(new SparkConf()) | ||
| assert(executorOffHeapMemory == 0) | ||
| } | ||
|
|
||
| test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is true") { | ||
| val offHeapMemoryInMB = 50 | ||
| val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024 | ||
| val sparkConf = new SparkConf() | ||
| .set(MEMORY_OFFHEAP_ENABLED, true) | ||
| .set(MEMORY_OFFHEAP_SIZE, offHeapMemory) | ||
| val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf) | ||
| assert(executorOffHeapMemory == offHeapMemoryInMB) | ||
| } | ||
|
|
||
| test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " + | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just wondering if we could add some yarn side UT to verify the container memory size, rather than verifying the correctness of off-heap configuration.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok ~ I'll try to add it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a new test suite |
||
| "but MEMORY_OFFHEAP_SIZE not config scene") { | ||
| val sparkConf = new SparkConf() | ||
| .set(MEMORY_OFFHEAP_ENABLED, true) | ||
| val expected = | ||
| s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true" | ||
| val message = intercept[IllegalArgumentException] { | ||
| YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf) | ||
| }.getMessage | ||
| assert(message.contains(expected)) | ||
| } | ||
| } | ||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also update the doc to reflect the changes here.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll try to update description about MemoryOverhead & OffHeapMemory in configuration.md, in this pr or new one ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to change the doc in this PR.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b3b5f83 update the
configuration.md.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some strange behavior about
spark.executor.pyspark.memory:if we config
spark.executor.pyspark.memory, the pyspark executor memory is Independent , but if we not configspark.executor.pyspark.memory, thememoryOverheadinclude it.