Skip to content

Commit 68ec4d6

Browse files
mukulmurthyzsxwing
authored andcommitted
[SPARK-25181][CORE] Limit Thread Pool size in BlockManager Master and Slave endpoints
## What changes were proposed in this pull request? Limit Thread Pool size in BlockManager Master and Slave endpoints. Currently, BlockManagerMasterEndpoint and BlockManagerSlaveEndpoint both have thread pools with nearly unbounded (Integer.MAX_VALUE) numbers of threads. In certain cases, this can lead to driver OOM errors. This change limits the thread pools to 100 threads; this should not break any existing behavior because any tasks beyond that number will get queued. ## How was this patch tested? Manual testing Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22176 from mukulmurthy/25181-threads. Authored-by: Mukul Murthy <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 2381953 commit 68ec4d6

File tree

2 files changed

+3
-2
lines changed

2 files changed

+3
-2
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class BlockManagerMasterEndpoint(
5454
// Mapping from block id to the set of block managers that have the block.
5555
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
5656

57-
private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
57+
private val askThreadPool =
58+
ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
5859
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
5960

6061
private val topologyMapper = {

core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class BlockManagerSlaveEndpoint(
3737
extends ThreadSafeRpcEndpoint with Logging {
3838

3939
private val asyncThreadPool =
40-
ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")
40+
ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool", 100)
4141
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
4242

4343
// Operations that involve removing blocks may be slow and should be done asynchronously

0 commit comments

Comments
 (0)