Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,8 @@ private[execution] object HashedRelation {
new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2, 1),
0)
}

Expand Down Expand Up @@ -401,9 +400,8 @@ private[joins] class UnsafeHashedRelation(
val taskMemoryManager = new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2, 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @zhztheplayer . Please split this into two line.

- Runtime.getRuntime.maxMemory / 2, 1),
+ Runtime.getRuntime.maxMemory / 2,
  1),

0)

val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes)
Copy link
Member

@yaooqinn yaooqinn Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if the per-JVM memory manager here can be used?

Copy link
Member Author

@zhztheplayer zhztheplayer Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not right now but I think it's the final target. We cannot directly modify the memory manager here, because currently Spark has to stick BHJ's memory allocation on the JVM heap. The per-JVM memory manager may be using off-heap mode.

#52817 will improve SHJ to make sure it follows the per-JVM memory manager's memory mode, but we need a separate solution for BHJ in the future (which relies on the read code path marked here).

Expand Down Expand Up @@ -576,9 +574,8 @@ private[execution] final class LongToUnsafeRowMap(
new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2, 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

0),
0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.SparkException
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -42,9 +42,8 @@ import org.apache.spark.util.collection.CompactBuffer
class HashedRelationSuite extends SharedSparkSession {
val umm = new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1)
Runtime.getRuntime.maxMemory,
Runtime.getRuntime.maxMemory / 2, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.


val mm = new TaskMemoryManager(umm, 0)

Expand Down Expand Up @@ -753,4 +752,18 @@ class HashedRelationSuite extends SharedSparkSession {
map.free()
}
}

test("UnsafeHashedRelation should throw OOM when there isn't enough memory") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did it hang before?

Copy link
Member Author

@zhztheplayer zhztheplayer Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's related to a logic introduced in PR #11095. In the PR, the following "retry code" is based on the assumption that JVM heap memory could be slightly smaller than the specified on-heap size in UMM:

https://github.com/davies/spark/blob/7ec7660381f3cd2047658f67b1882fccd83e95e5/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L268-L276

Because the code assumes the specified on-heap size in UMM is only finitely larger than the actual JVM heap size, so the call will return as soon as current size + acquiredButNotUsed size reaches the specified heap size limit.

However, we set the on-heap size to an infinite value for broadcast hashed relation:

val mm = Option(taskMemoryManager).getOrElse {
new TaskMemoryManager(
new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue / 2,
1),
0)
}
. So the "retry code" mentioned above will never end.

val relations = mutable.ArrayBuffer[HashedRelation]()
// We should finally see an OOM thrown since we are keeping allocating hashed relations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bad test, and will likely to break the CI process. Can we put it in the PR description as a manual test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @cloud-fan, thanks for reviewing.

This is a bad test, and will likely to break the CI process.

If you meant the OOM error could break the CI, I think we already rely on the similar logic in the production code:

try {
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
} catch (OutOfMemoryError e) {
logger.warn("Failed to allocate a page ({} bytes), try again.",
MDC.of(LogKeys.PAGE_SIZE, acquired));
// there is no enough memory actually, it means the actual free memory is smaller than
// MemoryManager thought, we should keep the acquired memory.
synchronized (this) {
acquiredButNotUsed += acquired;
allocatedPages.clear(pageNumber);
}
// this could trigger spilling to free some pages.
return allocatePage(size, consumer);
}
. So I thought it is benign to catch them in testing?

Or is there anything else you are concerned about?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok to test with "managed" OOM that is thrown by us, but not a real OOM that destabilize the CI service.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Removed and updated the PR description.

assertThrows[SparkOutOfMemoryError] {
while (true) {
// Allocates ~128 MiB each time.
relations += UnsafeHashedRelation(Iterator.empty, Nil, 1 << 22, mm)
}
}
// Releases the allocated memory.
relations.foreach(_.close())
mm.cleanUpAllAllocatedMemory
}
}