-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54354][SQL] Fix Spark hanging when there's not enough JVM heap memory for broadcast hashed relation #53065
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@@HyukjinKwon @yaooqinn @dongjoon-hyun Thanks. |
|
cc @cloud-fan |
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| test("UnsafeHashedRelation should throw OOM when there isn't enough memory") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did it hang before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's related to a logic introduced in PR #11095. In the PR, the following "retry code" is based on the assumption that JVM heap memory could be slightly smaller than the specified on-heap size in UMM:
Because the code assumes the specified on-heap size in UMM is only finitely larger than the actual JVM heap size, so the call will return as soon as current size + acquiredButNotUsed size reaches the specified heap size limit.
However, we set the on-heap size to an infinite value for broadcast hashed relation:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Lines 142 to 150 in 722bcc0
| val mm = Option(taskMemoryManager).getOrElse { | |
| new TaskMemoryManager( | |
| new UnifiedMemoryManager( | |
| new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), | |
| Long.MaxValue, | |
| Long.MaxValue / 2, | |
| 1), | |
| 0) | |
| } |
|
|
||
| test("UnsafeHashedRelation should throw OOM when there isn't enough memory") { | ||
| val relations = mutable.ArrayBuffer[HashedRelation]() | ||
| // We should finally see an OOM thrown since we are keeping allocating hashed relations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bad test, and will likely to break the CI process. Can we put it in the PR description as a manual test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @cloud-fan, thanks for reviewing.
This is a bad test, and will likely to break the CI process.
If you meant the OOM error could break the CI, I think we already rely on the similar logic in the production code:
spark/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
Lines 390 to 403 in dce992b
| try { | |
| page = memoryManager.tungstenMemoryAllocator().allocate(acquired); | |
| } catch (OutOfMemoryError e) { | |
| logger.warn("Failed to allocate a page ({} bytes), try again.", | |
| MDC.of(LogKeys.PAGE_SIZE, acquired)); | |
| // there is no enough memory actually, it means the actual free memory is smaller than | |
| // MemoryManager thought, we should keep the acquired memory. | |
| synchronized (this) { | |
| acquiredButNotUsed += acquired; | |
| allocatedPages.clear(pageNumber); | |
| } | |
| // this could trigger spilling to free some pages. | |
| return allocatePage(size, consumer); | |
| } |
Or is there anything else you are concerned about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok to test with "managed" OOM that is thrown by us, but not a real OOM that destabilize the CI service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. Removed and updated the PR description.
| Runtime.getRuntime.maxMemory / 2, 1), | ||
| 0) | ||
|
|
||
| val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know if the per-JVM memory manager here can be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not right now but I think it's the final target. We cannot directly modify the memory manager here, because currently Spark has to stick BHJ's memory allocation on the JVM heap. The per-JVM memory manager may be using off-heap mode.
#52817 will improve SHJ to make sure it follows the per-JVM memory manager's memory mode, but we need a separate solution for BHJ in the future (which relies on the read code path marked here).
|
can we fix the CI? |
…ashedRelation.scala Co-authored-by: Wenchen Fan <[email protected]>
|
@cloud-fan Fixed. |
| Long.MaxValue / 2, | ||
| 1), | ||
| Runtime.getRuntime.maxMemory, | ||
| Runtime.getRuntime.maxMemory / 2, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @zhztheplayer . Please split this into two line.
- Runtime.getRuntime.maxMemory / 2, 1),
+ Runtime.getRuntime.maxMemory / 2,
1),
| Long.MaxValue / 2, | ||
| 1), | ||
| Runtime.getRuntime.maxMemory, | ||
| Runtime.getRuntime.maxMemory / 2, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
| Long.MaxValue / 2, | ||
| 1) | ||
| Runtime.getRuntime.maxMemory, | ||
| Runtime.getRuntime.maxMemory / 2, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
|
@dongjoon-hyun Done. |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (with three minor style comments).
… memory for broadcast hashed relation
### What changes were proposed in this pull request?
A fix to let Spark throw OOM rather than hang when there's not enough JVM heap memory for broadcast hashed relation. The fix is done by passing the current JVM's heap size rather than `Long.MaxValue / 2` to create the temporary `UnifiedMemoryManager` for broadcasting.
This is an optimal setting because if the size we passed is too large, i.e., the current `Long.MaxValue / 2`, it will cause hanging; if the size is smaller than the current JVM heap size, the OOM might be thrown too early even when there's room in memory for the newly created hashed relation.
Before:
```scala
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1)
```
After:
```scala
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2,
1)
```
### Why are the changes needed?
Report the error fast instead of hanging.
### Does this PR introduce _any_ user-facing change?
In some scenarios where large unsafe hashed relations are allocated for broadcast hash join, user will see a meaningful OOM instead of hanging.
Before (hangs):
```
15:07:38.456 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.501 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.539 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.580 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.613 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.647 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
...
```
After (OOM):
```
An exception or error caused a run to abort: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 8589934592 bytes of memory, got 7194909081. SQLSTATE: 53200
org.apache.spark.memory.SparkOutOfMemoryError: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 8589934592 bytes of memory, got 7194909081. SQLSTATE: 53200
at org.apache.spark.errors.SparkCoreErrors$.outOfMemoryError(SparkCoreErrors.scala:456)
at org.apache.spark.errors.SparkCoreErrors.outOfMemoryError(SparkCoreErrors.scala)
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:868)
at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:202)
at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:209)
at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:464)
at org.apache.spark.sql.execution.joins.HashedRelationSuite.$anonfun$new$90(HashedRelationSuite.scala:760)
```
### How was this patch tested?
Using the following code to do a manual test since we don't want to add a test case that captures an OOM error:
```scala
// The PR's practice to use `Runtime.getRuntime.maxMemory` as the maximum size.
val umm = new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2,
1)
val mm = new TaskMemoryManager(umm, 0)
val relations = mutable.ArrayBuffer[HashedRelation]()
// We should finally see an OOM thrown since we are keeping allocating hashed relations.
assertThrows[SparkOutOfMemoryError] {
while (true) {
// Allocates ~128 MiB each time.
relations += UnsafeHashedRelation(Iterator.empty, Nil, 1 << 22, mm)
}
}
// Releases the allocated memory.
relations.foreach(_.close())
mm.cleanUpAllAllocatedMemory
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #53065 from zhztheplayer/wip-54353-mm-hang.
Authored-by: Hongze Zhang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit ac69d93)
Signed-off-by: Dongjoon Hyun <[email protected]>
|
Merged to master/4.1 for Apache Spark 4.1.0 to fix the hang issues. Thank you, @zhztheplayer , @cloud-fan , @yaooqinn . |
|
Thanks for the reviews, everyone. |
… memory for broadcast hashed relation
### What changes were proposed in this pull request?
A fix to let Spark throw OOM rather than hang when there's not enough JVM heap memory for broadcast hashed relation. The fix is done by passing the current JVM's heap size rather than `Long.MaxValue / 2` to create the temporary `UnifiedMemoryManager` for broadcasting.
This is an optimal setting because if the size we passed is too large, i.e., the current `Long.MaxValue / 2`, it will cause hanging; if the size is smaller than the current JVM heap size, the OOM might be thrown too early even when there's room in memory for the newly created hashed relation.
Before:
```scala
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1)
```
After:
```scala
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2,
1)
```
### Why are the changes needed?
Report the error fast instead of hanging.
### Does this PR introduce _any_ user-facing change?
In some scenarios where large unsafe hashed relations are allocated for broadcast hash join, user will see a meaningful OOM instead of hanging.
Before (hangs):
```
15:07:38.456 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.501 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.539 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.580 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.613 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.647 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
...
```
After (OOM):
```
An exception or error caused a run to abort: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 8589934592 bytes of memory, got 7194909081. SQLSTATE: 53200
org.apache.spark.memory.SparkOutOfMemoryError: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 8589934592 bytes of memory, got 7194909081. SQLSTATE: 53200
at org.apache.spark.errors.SparkCoreErrors$.outOfMemoryError(SparkCoreErrors.scala:456)
at org.apache.spark.errors.SparkCoreErrors.outOfMemoryError(SparkCoreErrors.scala)
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:868)
at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:202)
at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:209)
at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:464)
at org.apache.spark.sql.execution.joins.HashedRelationSuite.$anonfun$new$90(HashedRelationSuite.scala:760)
```
### How was this patch tested?
Using the following code to do a manual test since we don't want to add a test case that captures an OOM error:
```scala
// The PR's practice to use `Runtime.getRuntime.maxMemory` as the maximum size.
val umm = new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2,
1)
val mm = new TaskMemoryManager(umm, 0)
val relations = mutable.ArrayBuffer[HashedRelation]()
// We should finally see an OOM thrown since we are keeping allocating hashed relations.
assertThrows[SparkOutOfMemoryError] {
while (true) {
// Allocates ~128 MiB each time.
relations += UnsafeHashedRelation(Iterator.empty, Nil, 1 << 22, mm)
}
}
// Releases the allocated memory.
relations.foreach(_.close())
mm.cleanUpAllAllocatedMemory
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#53065 from zhztheplayer/wip-54353-mm-hang.
Authored-by: Hongze Zhang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
… memory for broadcast hashed relation
### What changes were proposed in this pull request?
A fix to let Spark throw OOM rather than hang when there's not enough JVM heap memory for broadcast hashed relation. The fix is done by passing the current JVM's heap size rather than `Long.MaxValue / 2` to create the temporary `UnifiedMemoryManager` for broadcasting.
This is an optimal setting because if the size we passed is too large, i.e., the current `Long.MaxValue / 2`, it will cause hanging; if the size is smaller than the current JVM heap size, the OOM might be thrown too early even when there's room in memory for the newly created hashed relation.
Before:
```scala
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1)
```
After:
```scala
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2,
1)
```
### Why are the changes needed?
Report the error fast instead of hanging.
### Does this PR introduce _any_ user-facing change?
In some scenarios where large unsafe hashed relations are allocated for broadcast hash join, user will see a meaningful OOM instead of hanging.
Before (hangs):
```
15:07:38.456 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.501 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.539 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.580 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.613 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
15:07:38.647 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (8589934592 bytes), try again.
...
```
After (OOM):
```
An exception or error caused a run to abort: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 8589934592 bytes of memory, got 7194909081. SQLSTATE: 53200
org.apache.spark.memory.SparkOutOfMemoryError: [UNABLE_TO_ACQUIRE_MEMORY] Unable to acquire 8589934592 bytes of memory, got 7194909081. SQLSTATE: 53200
at org.apache.spark.errors.SparkCoreErrors$.outOfMemoryError(SparkCoreErrors.scala:456)
at org.apache.spark.errors.SparkCoreErrors.outOfMemoryError(SparkCoreErrors.scala)
at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
at org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:868)
at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:202)
at org.apache.spark.unsafe.map.BytesToBytesMap.<init>(BytesToBytesMap.java:209)
at org.apache.spark.sql.execution.joins.UnsafeHashedRelation$.apply(HashedRelation.scala:464)
at org.apache.spark.sql.execution.joins.HashedRelationSuite.$anonfun$new$90(HashedRelationSuite.scala:760)
```
### How was this patch tested?
Using the following code to do a manual test since we don't want to add a test case that captures an OOM error:
```scala
// The PR's practice to use `Runtime.getRuntime.maxMemory` as the maximum size.
val umm = new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2,
1)
val mm = new TaskMemoryManager(umm, 0)
val relations = mutable.ArrayBuffer[HashedRelation]()
// We should finally see an OOM thrown since we are keeping allocating hashed relations.
assertThrows[SparkOutOfMemoryError] {
while (true) {
// Allocates ~128 MiB each time.
relations += UnsafeHashedRelation(Iterator.empty, Nil, 1 << 22, mm)
}
}
// Releases the allocated memory.
relations.foreach(_.close())
mm.cleanUpAllAllocatedMemory
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes apache#53065 from zhztheplayer/wip-54353-mm-hang.
Authored-by: Hongze Zhang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
A fix to let Spark throw OOM rather than hang when there's not enough JVM heap memory for broadcast hashed relation. The fix is done by passing the current JVM's heap size rather than
Long.MaxValue / 2to create the temporaryUnifiedMemoryManagerfor broadcasting.This is an optimal setting because if the size we passed is too large, i.e., the current
Long.MaxValue / 2, it will cause hanging; if the size is smaller than the current JVM heap size, the OOM might be thrown too early even when there's room in memory for the newly created hashed relation.Before:
After:
Why are the changes needed?
Report the error fast instead of hanging.
Does this PR introduce any user-facing change?
In some scenarios where large unsafe hashed relations are allocated for broadcast hash join, user will see a meaningful OOM instead of hanging.
Before (hangs):
After (OOM):
How was this patch tested?
Using the following code to do a manual test since we don't want to add a test case that captures an OOM error:
Was this patch authored or co-authored using generative AI tooling?
No.