Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,21 @@ public long maxChunksBeingTransferred() {
* and could take long time to process due to disk contentions. By configuring a slightly
* higher number of shuffler server threads, we are able to reserve some threads for
* handling other RPC messages, thus making the Client less likely to experience timeout
* when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores
* or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of io.serverThreads
* which equals 0.9 * 2*#cores or 0.9 * io.serverThreads.
* when sending RPC messages to the shuffle server. The number of threads used for handling
* chunked fetch requests are percentage of io.serverThreads (if defined) else it is a percentage
* of 2 * #cores. However, a percentage of 0 means netty default number of threads which
* is 2 * #cores ignoring io.serverThreads. The percentage here is configured via
* spark.shuffle.server.chunkFetchHandlerThreadsPercent. The returned value is rounded off to
* ceiling of the nearest integer.
*/
public int chunkFetchHandlerThreads() {
if (!this.getModuleName().equalsIgnoreCase("shuffle")) {
return 0;
}
int chunkFetchHandlerThreadsPercent =
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
return this.serverThreads() > 0 ? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100:
(2 * NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100;
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 100);
return (int)Math.ceil(
(this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors()) *
chunkFetchHandlerThreadsPercent/(double)100);
}
}