Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private[spark] class Client(

// Executor related configurations
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
private val executorMemoryOverhead =
YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf)

private val isPython = sparkConf.get(IS_PYTHON_APP)
private val pysparkWorkerMemory: Int = if (isPython) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ private[yarn] class YarnAllocator(
// Executor memory in MiB.
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
protected val memoryOverhead: Int = YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf)
protected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) {
sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, P
import org.apache.hadoop.yarn.util.ConverterUtils

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.resource.ResourceID
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.util.Utils

object YarnSparkHadoopUtil {
object YarnSparkHadoopUtil extends Logging {

// Additional memory overhead
// 10% was arrived at experimentally. In the interest of minimizing memory waste while covering
Expand Down Expand Up @@ -184,4 +184,29 @@ object YarnSparkHadoopUtil {
ConverterUtils.toContainerId(containerIdString)
}

/**
* If MEMORY_OFFHEAP_ENABLED is true, we should ensure executorOverheadMemory requested value
* is not less than MEMORY_OFFHEAP_SIZE, otherwise the memory resource requested for executor
* may be not enough.
*/
def executorMemoryOverheadRequested(sparkConf: SparkConf): Int = {
val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
val overhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
val offHeap = if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
val size =
sparkConf.getSizeAsMb(MEMORY_OFFHEAP_SIZE.key, MEMORY_OFFHEAP_SIZE.defaultValueString)
require(size > 0,
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if MEMORY_OFFHEAP_SIZE could equal to 0. The definition of MEMORY_OFFHEAP_SIZE checks that it could be >= 0.

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little conflict with the MemoryManager as following:
image

if MEMORY_OFFHEAP_ENABLED is enable, MemoryManager.tungstenMemoryMode will enter OFF_HEAP branch and need MEMORY_OFFHEAP_SIZE > 0 and I think we should be consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I think we should change the code here.

.checkValue(_ >= 0, "The off-heap memory size must not be negative")

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 is defaultValue, change to

.checkValue(_ > 0, "The off-heap memory size must be positive")
.createWithDefault(1)

?
otherwise will throw IllegalArgumentException when offHeapEnabled is false and defaultValue is 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should give a suitable defaultValue ,like 1073741824(1g)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao Do we need to change 0 to a suitable defaultValue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then I would suggest not to change it. Seems there's no good value which could cover most of the scenarios, so good to leave as it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its odd that check is >= 0 in the config, seems like we should change but can you file a separate jira for that?

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK~ I will add a new jira to discuss this issue.

if (size > overhead) {
logWarning(s"The value of ${MEMORY_OFFHEAP_SIZE.key}(${size}MB) will be used as " +
s"executorMemoryOverhead to request resource to ensure that Executor has enough memory " +
s"to use. It is recommended that the configuration value of " +
s"${EXECUTOR_MEMORY_OVERHEAD.key} should be no less than ${MEMORY_OFFHEAP_SIZE.key} " +
s"when ${MEMORY_OFFHEAP_ENABLED.key} is true.")
}
size
} else 0
math.max(overhead, offHeap).toInt
Copy link
Contributor

@jerryshao jerryshao Aug 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if it is better to change to overhead = overhead + offHeap if off-heap is enabled. Mainly because off heap memory is not only used for Spark itself related, but also for Netty and other native libraries. If we only guarantee overhead > offHeap, then it would somehow occupy the usage of Netty and others. Just my two cents :).

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it ~, So should we add 2 field like isOffHeapEnabled and executorOffHeapMemory to YarnAllocator then use executorMemory + memoryOverhead + pysparkWorkerMemory + executorOffHeapMemory to request resource and no longer modify memoryOverhead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's a bit complex as for now:

  1. If we assume overhead memory includes all the off-heap memory Spark used (include everything). Then user should be aware of the different off-heap memory settings, and carefully set the overhead number to cover all the usages.
  2. If we assume that overhead memory only related to some additional memory usage (not explicitly set by Spark, like off-heap memory). Then the overall executor memory should add all as mentioned above.

I think it would be better to involve other's opinion. CC @vanzin @tgravescs .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I always thought this was a bit weird off heap was just included in the overhead, but never took the time to go back to see if it was discussed.

I think it's better to specifically add the off heap instead of include in the overhead. Just like we did for the pyspark memory. executorMemory + memoryOverhead + pysparkWorkerMemory + executorOffHeapMemory. I think that keeps things more consistent and obvious to the user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs Agree with you, overhead should be used to describe memory not use by Spark, like Netty used or JVM used as @jerryshao said, and we should clearly describe it in the configuration document.

So change to use executorMemory + memoryOverhead + pysparkWorkerMemory + executorOffHeapMemory to request resource?

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
@beliefer Now YarnAllocator line 150 use executorMemory + memoryOverhead + pysparkWorkerMemory to new Resource Instance, Is this wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, if the user configures offheapMemory and pysparkWorkerMemory,
He still needs to configure overhead Memroy and ensure that the configuration is reasonable(memoryOverhead > offheapMemory + pysparkWorkerMemory) in Yarn mode, so that users may need to care about more details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao Is the current approach feasible?

Copy link
Contributor

@beliefer beliefer Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have check the code and doc, there exists some inconsistent. According to the docs, memoryOverhead should comprise pysparkWorkerMemory. But the code have different behavior.
We need to fix the inconsistent. I think should reduce parameter to control memory, because more simple. @JoshRosen Could you take a look at this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @tgravescs 's opinion.

yeah, I understand that, if we are going to change it, 3.0 is a good time to change that behavior. Like I said, I had found the off heap included in the overhead as confusing because you already had another separate config, why do I as a user have to add it into another config.

If overhead memory includes off-heap memory, pysparkWorkMemory and others, it makes user hard to set a proper overhead memory, users should know every other settings and figure out a proper number. As of time 3.0, I think we should give a good definition of overhead memory, it can be inconsistent with old version.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import org.scalatest.Matchers

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.util.{ResetSystemProperties, Utils}

Expand Down Expand Up @@ -140,4 +142,69 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}

}

test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is false, " +
"use MEMORY_OVERHEAD_MIN scene") {
val executorMemoryOverhead =
YarnSparkHadoopUtil.executorMemoryOverheadRequested(new SparkConf())
assert(executorMemoryOverhead == MEMORY_OVERHEAD_MIN)
}

test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is false, " +
"use MEMORY_OVERHEAD_FACTOR * executorMemory scene") {
val executorMemory: Long = 5000
val sparkConf = new SparkConf().set(EXECUTOR_MEMORY, executorMemory)
val executorMemoryOverhead =
YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf)
assert(executorMemoryOverhead == executorMemory * MEMORY_OVERHEAD_FACTOR)
}

test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is false, " +
"use EXECUTOR_MEMORY_OVERHEAD config value scene") {
val memoryOverhead: Long = 100
val sparkConf = new SparkConf().set(EXECUTOR_MEMORY_OVERHEAD, memoryOverhead)
val executorMemoryOverhead =
YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf)
assert(executorMemoryOverhead == memoryOverhead)
}

test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
"use EXECUTOR_MEMORY_OVERHEAD config value scene") {
val memoryOverhead: Long = 100
val offHeapMemory: Long = 50 * 1024 * 1024
val sparkConf = new SparkConf()
.set(EXECUTOR_MEMORY_OVERHEAD, memoryOverhead)
.set(MEMORY_OFFHEAP_ENABLED, true)
.set(MEMORY_OFFHEAP_SIZE, offHeapMemory)
val executorMemoryOverhead =
YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf)
assert(executorMemoryOverhead == memoryOverhead)
}

test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
"use MEMORY_OFFHEAP_SIZE config value scene") {
val memoryOverhead: Long = 50
val offHeapMemoryInMB = 100
val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024
val sparkConf = new SparkConf()
.set(EXECUTOR_MEMORY_OVERHEAD, memoryOverhead)
.set(MEMORY_OFFHEAP_ENABLED, true)
.set(MEMORY_OFFHEAP_SIZE, offHeapMemory)
val executorMemoryOverhead =
YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf)
assert(executorMemoryOverhead == offHeapMemoryInMB)
}

test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if we could add some yarn side UT to verify the container memory size, rather than verifying the correctness of off-heap configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok ~ I'll try to add it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new test suite SPARK-28577#YarnAllocator.resource.memory should include offHeapSize when offHeapEnabled is true. in YarnAllocatorSuite

"but MEMORY_OFFHEAP_SIZE not config scene") {
val memoryOverhead: Long = 50
val sparkConf = new SparkConf()
.set(EXECUTOR_MEMORY_OVERHEAD, memoryOverhead)
.set(MEMORY_OFFHEAP_ENABLED, true)
val expected = "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true"
val message = intercept[IllegalArgumentException] {
YarnSparkHadoopUtil.executorMemoryOverheadRequested(sparkConf)
}.getMessage
assert(message.contains(expected))
}
}