Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,17 +250,17 @@ of the most common options to set are:
<td><code>spark.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>
Amount of non-heap memory to be allocated per executor process in cluster mode, in MiB unless
Amount of additional memory to be allocated per executor process in cluster mode, in MiB unless
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should remove interned strings, it use heap space after Java8

otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
This option is currently supported on YARN and Kubernetes.
<br/>
<em>Note:</em> Non-heap memory includes off-heap memory
(when <code>spark.memory.offHeap.enabled=true</code>) and memory used by other executor processes
(e.g. python process that goes with a PySpark executor) and memory used by other non-executor
processes running in the same container. The maximum memory size of container to running executor
is determined by the sum of <code>spark.executor.memoryOverhead</code> and
<code>spark.executor.memory</code>.
<em>Note:</em> Additional memory includes PySpark executor memory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another problem is Additional better than Non-heap?

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to explain Non-heap? Now MemoryOverHead not includes offheap, and maybe PySpark executor memory should add a default value and separate from MemoryOverHead, should we redefinition this concept.
Any other suggested names?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this PR looks a little conflict with origin definition.
Non-heap includes all the memory but java heap.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is the point, we are changing it so that you don't have to include off heap inside of overhead memory. User is already specifying off heap size so why should they have to add it to overhead memory? it works just the other configs - pyspark memory, heap memory,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the meaning of this PR. Maybe the new idea is a way. As I know, the origin decision is to unify all the different part.

(when <code>spark.executor.pyspark.memory</code> is not configured) and memory used by other
non-executor processes running in the same container. The maximum memory size of container to
running executor is determined by the sum of <code>spark.executor.memoryOverhead</code>,
<code>spark.executor.memory</code>, <code>spark.memory.offHeap.size</code> and
<code>spark.executor.pyspark.memory</code>.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -1357,9 +1357,6 @@ Apart from these, the following properties are also available, and may be useful
<td>
If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory
use is enabled, then <code>spark.memory.offHeap.size</code> must be positive.
<em>Note:</em> If off-heap memory is enabled, may need to raise the non-heap memory size
(e.g. increase <code>spark.driver.memoryOverhead</code> or
<code>spark.executor.memoryOverhead</code>).
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ private[spark] class Client(

// Executor related configurations
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
// Executor offHeap memory in MiB.
protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt

Expand Down Expand Up @@ -346,12 +348,14 @@ private[spark] class Client(
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo("Verifying our application has not requested more than the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
val executorMem =
executorMemory + executorOffHeapMemory + executorMemoryOverhead + pysparkWorkerMemory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also update the doc to reflect the changes here.

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll try to update description about MemoryOverhead & OffHeapMemory in configuration.md, in this pr or new one ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to change the doc in this PR.

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b3b5f83 update the configuration.md.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some strange behavior about spark.executor.pyspark.memory:
if we config spark.executor.pyspark.memory , the pyspark executor memory is Independent , but if we not config spark.executor.pyspark.memory , the memoryOverhead include it.

if (executorMem > maxMem) {
throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
throw new IllegalArgumentException(s"Required executor memory ($executorMemory MB), " +
s"offHeap memory ($executorOffHeapMemory) MB, overhead ($executorMemoryOverhead MB), " +
s"and PySpark memory ($pysparkWorkerMemory MB) is above the max threshold ($maxMem MB) " +
"of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' " +
"and/or 'yarn.nodemanager.resource.memory-mb'.")
}
val amMem = amMemory + amMemoryOverhead
if (amMem > maxMem) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ private[yarn] class YarnAllocator(

// Executor memory in MiB.
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Executor offHeap memory in MiB.
protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
Expand All @@ -149,7 +151,7 @@ private[yarn] class YarnAllocator(
// Resource capability requested for each executor
private[yarn] val resource: Resource = {
val resource = Resource.newInstance(
executorMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
Copy link
Contributor

@beliefer beliefer Aug 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According line 258 to 260 in docs/configuration.md, memoryOverhead includes pysparkWorkerMemory, but looks difference here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Described in the document is Additional memory includes PySpark executor memory (when <code>spark.executor.pyspark.memory</code> is not configured), I think we need a new jira to discuss how to solve this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this makes me confused.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this what spark.executor.memoryOverhead already adds via memoryOverhead?
I guess I'm wondering what MEMORY_OFFHEAP_SIZE does that's supposed to be different.

Copy link
Contributor Author

@LuciferYang LuciferYang Sep 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen memoryOverhead include MEMORY_OFFHEAP_SIZE before this pr, and memoryOverhead and MEMORY_OFFHEAP_SIZE had to be modified at the same time to ensure request enough resources from yarn if I want to increase MEMORY_OFFHEAP_SIZE , this is not user friendly, and this has always been confusing, why we need to modify two memory-related parameters simultaneously for one purpose? This pr let them be independent.

ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
logDebug(s"Created resource capability: $resource")
resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, P
import org.apache.hadoop.yarn.util.ConverterUtils

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.resource.ResourceID
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.util.Utils

object YarnSparkHadoopUtil {
Expand Down Expand Up @@ -184,4 +183,17 @@ object YarnSparkHadoopUtil {
ConverterUtils.toContainerId(containerIdString)
}

/**
* Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
*/
def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
require(sizeInMB > 0,
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if MEMORY_OFFHEAP_SIZE could equal to 0. The definition of MEMORY_OFFHEAP_SIZE checks that it could be >= 0.

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little conflict with the MemoryManager as following:
image

if MEMORY_OFFHEAP_ENABLED is enable, MemoryManager.tungstenMemoryMode will enter OFF_HEAP branch and need MEMORY_OFFHEAP_SIZE > 0 and I think we should be consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I think we should change the code here.

.checkValue(_ >= 0, "The off-heap memory size must not be negative")

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 is defaultValue, change to

.checkValue(_ > 0, "The off-heap memory size must be positive")
.createWithDefault(1)

?
otherwise will throw IllegalArgumentException when offHeapEnabled is false and defaultValue is 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should give a suitable defaultValue ,like 1073741824(1g)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao Do we need to change 0 to a suitable defaultValue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then I would suggest not to change it. Seems there's no good value which could cover most of the scenarios, so good to leave as it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok~

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its odd that check is >= 0 in the config, seems like we should change but can you file a separate jira for that?

Copy link
Contributor Author

@LuciferYang LuciferYang Aug 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK~ I will add a new jira to discuss this issue.

sizeInMB
} else {
0
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -514,4 +514,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
verify(rmClientSpy)
.updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
}

test("SPARK-28577#YarnAllocator.resource.memory should include offHeapSize " +
"when offHeapEnabled is true.") {
val originalOffHeapEnabled = sparkConf.get(MEMORY_OFFHEAP_ENABLED)
val originalOffHeapSize = sparkConf.get(MEMORY_OFFHEAP_SIZE)
val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
val offHeapMemoryInMB = 1024L
val offHeapMemoryInByte = offHeapMemoryInMB * 1024 * 1024
try {
sparkConf.set(MEMORY_OFFHEAP_ENABLED, true)
sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte)
val allocator = createAllocator(maxExecutors = 1,
additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
val memory = allocator.resource.getMemory
assert(memory ==
executorMemory + offHeapMemoryInMB + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)
} finally {
sparkConf.set(MEMORY_OFFHEAP_ENABLED, originalOffHeapEnabled)
sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.Matchers
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.util.{ResetSystemProperties, Utils}

Expand Down Expand Up @@ -140,4 +141,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}

}

test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is false") {
val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(new SparkConf())
assert(executorOffHeapMemory == 0)
}

test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is true") {
val offHeapMemoryInMB = 50
val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024
val sparkConf = new SparkConf()
.set(MEMORY_OFFHEAP_ENABLED, true)
.set(MEMORY_OFFHEAP_SIZE, offHeapMemory)
val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
assert(executorOffHeapMemory == offHeapMemoryInMB)
}

test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if we could add some yarn side UT to verify the container memory size, rather than verifying the correctness of off-heap configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok ~ I'll try to add it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new test suite SPARK-28577#YarnAllocator.resource.memory should include offHeapSize when offHeapEnabled is true. in YarnAllocatorSuite

"but MEMORY_OFFHEAP_SIZE not config scene") {
val sparkConf = new SparkConf()
.set(MEMORY_OFFHEAP_ENABLED, true)
val expected =
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true"
val message = intercept[IllegalArgumentException] {
YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
}.getMessage
assert(message.contains(expected))
}
}