Skip to content

Conversation

@redsanket
Copy link

@redsanket redsanket commented Aug 21, 2018

What changes were proposed in this pull request?

Description:
Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

For Original PR please refer here
#21402

How was this patch tested?

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

@redsanket
Copy link
Author

redsanket commented Aug 21, 2018

@tgravescs @vanzin @Victsm please review thanks

@tgravescs
Copy link
Contributor

ok to test

@vanzin
Copy link
Contributor

vanzin commented Aug 21, 2018

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #95042 has finished for PR 22173 at commit 3bab74c.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

.addLast("handler", channelHandler);
.addLast("handler", channelHandler)
// Use a separate EventLoopGroup to handle ChunkFetchRequest messages.
.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler);
Copy link
Contributor

@vanzin vanzin Aug 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I think there is some waste here. Not all channels actually need the chunk fetch handler. Basically only the shuffle server (external or not) does. So for all other cases - RpcEnv server and clients, shuffle clients - you'd have this new thread pool just sitting there.

It would be good to avoid that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i did notice that... makes sense

@SparkQA
Copy link

SparkQA commented Aug 27, 2018

Test build #95289 has finished for PR 22173 at commit cc40d9b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 27, 2018

Test build #95293 has finished for PR 22173 at commit 6580ff1.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

complete expanion of imports

rearrange imports
@SparkQA
Copy link

SparkQA commented Aug 27, 2018

Test build #95299 has finished for PR 22173 at commit 50258f7.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 27, 2018

Test build #95300 has finished for PR 22173 at commit d86503c.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95352 has finished for PR 22173 at commit 470e9a6.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 29, 2018

Test build #95423 has finished for PR 22173 at commit dcc41f5.

  • This patch fails Java style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 29, 2018

Test build #95434 has finished for PR 22173 at commit b1105bd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Aug 30, 2018

retest this please

@vanzin
Copy link
Contributor

vanzin commented Aug 30, 2018

(I haven't forgotten about this, just haven't had the time to look at it.)

@SparkQA
Copy link

SparkQA commented Aug 31, 2018

Test build #95499 has finished for PR 22173 at commit b1105bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@redsanket
Copy link
Author

thanks @vanzin, also @tgravescs gentle ping...

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we have the same issue here with processStreamRequest since its going to hold a thread. Same goes I think for the uploadStream request. We could move that to be under the same threadpool as chunkedfetch request. Or we could do those as separate jira as well since the majority of requests will be the chunkedfetch requests.

}
int chunkFetchHandlerThreadsPercent =
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
return this.serverThreads() > 0? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space between 0 and ?, and space between 100 and :

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we aren't documenting chunkFetchHandlerThreadsPercent since the serverthreads config isn't documented

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a good idea to document both as this is an important config. Let me know your thoughts

* which equals 0.1 * 2*#cores or 0.1 * io.serverThreads.
*/
public int chunkFetchHandlerThreads() {
if(!this.getModuleName().equalsIgnoreCase("shuffle")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after if before (

* higher number of shuffler server threads, we are able to reserve some threads for
* handling other RPC messages, thus making the Client less likely to experience timeout
* when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores
* or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize these are just examples but normally I would expect a user to have the threads processing chunk request much more then those doing other things so perhaps the example should be like 90%.

Were any tests done to see how many threads were needed for the others, I would expect very few?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No based on how many threads required for other rpc calls, i have not tested them, but the whole point would be to reduce the dependency how much time the chunkFetchedRequests will be spending doing disk I/O

int chunkFetchHandlerThreadsPercent =
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
return this.serverThreads() > 0? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100:
(2* NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space between 2 and *

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra line.

private ChannelFuture respond(final Channel channel,
final Encodable result) throws InterruptedException {
final SocketAddress remoteAddress = channel.remoteAddress();
return channel.writeAndFlush(result).sync().addListener((ChannelFutureListener) future -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually want to use await() here instead of sync(). Sync says "Waits for this future until it is done, and rethrows the cause of the failure if this future failed."

I'm not sure we want a rethrow here since we have the addListener to handle the future failure.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes await can be used as well... i will test it out and let you know its ramifications if any, thanks

Copy link
Author

@redsanket redsanket Sep 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i figured the following... i think it better to rethrow a.k.a. use sync() instead of quitely logging the exception via await(). The reason being as follows....

The respond call made here https://github.com/apache/spark/pull/22173/files#diff-a37b07d454be4d6cd26edb294661d4e3R106 waits for sending the failed or success response back. Hence, it is a blocking call. If an exception is thrown using await() we log quitely and do no rethrow, the underlying rpc handler will not know about the request status here https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java#L183, rethrowing the exception allows it to handle and throw an rpc failure here https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java#L194 else we have to wait for a timeout exception instead of an Interrupt exception. Either cases this seems fine but using sync() we will fail fast...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it doesn't go through https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java#L183 since that is hte rpchandler . The TransportChannelHandler won't handle it at all, it will go to the pipeline which calls the ChunkFetchRequestHandler. So whatever is calling that is what would get the exception propogated. The reason I said this is because previously when this was handled in TransportChannelHandler, it wasn't throwing the exception up. The TrasnportRequestHandler.respond is calling hte writeAndFlush async and then just adding a listener which would simply log an error if the future wasn't successful.
I want to make sure if an exception is thrown here it doesn't kill the entire external shuffle service for instance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i had chunkFetchHandler as an instance of rpcHandler in my mind...

Copy link
Author

@redsanket redsanket Sep 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It justs logs the exception but the behaviour is same… as in the first case an additional warning from netty side is passed onto the listener and the channel is closed… in the later we quitely log error and the channel is closed… the executors deal with the closed channel and retry or schedule the task on different executor… and eventually the job seems to succeed… in the example i ran… i expect the retry to go through in such scenarios… i will just use await() as the ERROR is logged and we dont have to be more verbose with the warning from the netty side...

@SparkQA
Copy link

SparkQA commented Sep 10, 2018

Test build #95876 has finished for PR 22173 at commit 8153de5.

  • This patch fails from timeout after a configured wait of `400m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Sep 17, 2018

Test build #96140 has finished for PR 22173 at commit 8153de5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

this.rpcHandler = rpcHandler;
this.closeIdleConnections = closeIdleConnections;

synchronized(this.getClass()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think synchronized(this.getClass()) is not recommended due to it not handling inheritance and such. Use synchronized(TransportContext.class)


synchronized(this.getClass()) {
if (chunkFetchWorkers == null && conf.getModuleName() != null &&
conf.getModuleName().equalsIgnoreCase("shuffle")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix spacing here, line up conf.getModuleName with the chunkFetchWorkers

chunkFetchWorkers = NettyUtils.createEventLoop(
IOMode.valueOf(conf.ioMode()),
conf.chunkFetchHandlerThreads(),
"chunk-fetch-handler");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for consistency perhaps name thread shuffle-chunk-fetch-handler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like you mention if we can not create the event loop when on the client side that would be best

TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
channel.pipeline()
ChunkFetchRequestHandler chunkFetchHandler =
createChunkFetchHandler(channelHandler, channelRpcHandler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix spacing, only indented 2 spaces

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()),
cause);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spacing 2, fix throughout the file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can actually be unwrapped here

logger.error(String.format("Error opening block %s for request from %s",
msg.streamChunkId, getRemoteAddress(channel)), e);
respond(channel,
new ChunkFetchFailure(msg.streamChunkId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix wrapping and sapcing

.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
.addLast("idleStateHandler",
new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix indentation

import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.*;
import org.apache.spark.network.protocol.*;
import org.apache.spark.network.client.RpcResponseCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert the imports to be .*

private ChannelFuture respond(final Channel channel,
final Encodable result) throws InterruptedException {
final SocketAddress remoteAddress = channel.remoteAddress();
return channel.writeAndFlush(result).sync().addListener((ChannelFutureListener) future -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it doesn't go through https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java#L183 since that is hte rpchandler . The TransportChannelHandler won't handle it at all, it will go to the pipeline which calls the ChunkFetchRequestHandler. So whatever is calling that is what would get the exception propogated. The reason I said this is because previously when this was handled in TransportChannelHandler, it wasn't throwing the exception up. The TrasnportRequestHandler.respond is calling hte writeAndFlush async and then just adding a listener which would simply log an error if the future wasn't successful.
I want to make sure if an exception is thrown here it doesn't kill the entire external shuffle service for instance.

return 0;
}
int chunkFetchHandlerThreadsPercent =
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix spacing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is documented above... if it is 0 or 100 it is 2*#cores or io.serverThreads

@SparkQA
Copy link

SparkQA commented Sep 20, 2018

Test build #96307 has finished for PR 22173 at commit 0348ec8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. looks good.

I'm not super fond of the isClientOnly parameter to TransportContext (which I recommended instead of other ways) but I can't think of with a more elegant solution at this point.

@tgravescs
Copy link
Contributor

merged into master (2.5.0)

@asfgit asfgit closed this in ff601cf Sep 21, 2018
@redsanket
Copy link
Author

closes #21402

// Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling
// max number of TransportServer worker threads that are blocked on writing response
// of ChunkFetchRequest message back to the client via the underlying channel.
private static EventLoopGroup chunkFetchWorkers;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any special reason that this must be a global one? I have not yet looked the details. But looks like this may cause ChunkFetchIntegrationSuite flaky as there is no isolation between tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't been able to reproduce this but the number of threads used for this tests are 2* number of cores or spark.shuffle.io.serverThreads.

Willymontaz pushed a commit to Willymontaz/spark that referenced this pull request Feb 12, 2019
…dle block fetch requests.

## What changes were proposed in this pull request?

Description:
Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

For Original PR please refer here
apache#21402

## How was this patch tested?

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22173 from redsanket/SPARK-24335.

Authored-by: Sanket Chintapalli <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
Willymontaz added a commit to criteo-forks/spark that referenced this pull request Feb 12, 2019
…dle block fetch requests. (#89)

## What changes were proposed in this pull request?

Description:
Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

For Original PR please refer here
apache#21402

## How was this patch tested?

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22173 from redsanket/SPARK-24335.

Authored-by: Sanket Chintapalli <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
cfmcgrady pushed a commit to cfmcgrady/spark that referenced this pull request Jul 31, 2019
…dle block fetch requests.

Description:
Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

For Original PR please refer here
apache#21402

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22173 from redsanket/SPARK-24335.

Authored-by: Sanket Chintapalli <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
prakharjain09 pushed a commit to prakharjain09/spark that referenced this pull request Nov 29, 2019
…dle block fetch requests.

Description:
Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

For Original PR please refer here
apache#21402

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22173 from redsanket/SPARK-24335.

Authored-by: Sanket Chintapalli <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
(cherry picked from commit ff601cf)
@cloud-fan
Copy link
Contributor

We hit significant performance regression in our internal workload caused by this commit. After this commit, the executor can handle at most N chunk fetch requests at the same time, where N is the value of spark.shuffle.io.serverThreads * spark.shuffle.server.chunkFetchHandlerThreadsPercent. Previously, it was unlimited, and most of the time we can saturate the underlying channel.

This commit does fix a nasty problem, and I'm fine with it even if it may introduce perf regression, but there should be a way to turn it off. Unfortunately, we can't turn off this feature. We can set spark.shuffle.server.chunkFetchHandlerThreadsPercent to a large value so that we can handle many chunk fetch requests at the same time, but it's hard to pick a good value which is not too large and can saturate the channel.

Looking back at this problem, I think we can either create a dedicated channel for non chunk fetch request, or ask netty to handle channel write of non chunk fetch request first. Both seem hard to implement. Shall we revert it first, and think of a good fix later?

@tgravescs
Copy link
Contributor

Can you clarify? The default for spark.shuffle.server.chunkFetchHandlerThreadsPercent is 0 which should be the same number of chunk fetcher threads as previously. It wasn't previously unlimited as Netty would limit to 2*number of cores by default. (https://github.com/netty/netty/blob/9621a5b98120f9596b5d2a337330339dda199bde/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40)

Were you configuring Netty or something to make it unlimited? Or perhaps the default for Netty changed or we missed something?

The intention was no perf regression and same default as previous behavior. Note there was a follow on to this pr to fix the default calculation properly.
https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java#L340
It does introduce more overall threads and the chunk fetcher ones do have to go back through

Please explain more what you are seeing and what settings you are using.

@cloud-fan
Copy link
Contributor

It's good to know that the underlying channel write thread pool has the same concurrency with the request handling thread pool.

So there are 2 thread pools: one to handle the requests, one to write data to the channel.

Previously, fetch requests were handled by the request handling thread pool, and return immediately after reading shuffle blocks.

Now, fetch requests are handled by a new fetch request thread pool, and return until channel write is completed. It's kind of handle fetch requests in sync mode, while previously it's more likely to keep both thread pools busy with reading shuffle blocks/writing data to channel.

Unfortunately I can't share our internal workload (we don't have special settings), I'll try to write a microbenchmark.

@tgravescs
Copy link
Contributor

sorry I'm not understanding quite what you are saying and I think perhaps you mean fetch requests in "async" mode?
There are now 2 thread pools, the fetch requests go to the new thread pool, but the response does still have to go back through the original evenLoop Group, so the fetches are somewhat async now and like the comment for the "flush" function mentioned (see the comment: https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java#L118). it does add in an await() call there to help with some throttling so it didn't take up all the threads again and perhaps that is causing some slowness if the event loop channel its registered to is also processing some incoming events in between.
If can get more information from your workload to see what the threads are doing or have test case to reproduce that would be very helpful

@cloud-fan
Copy link
Contributor

When I say sync mode, I mean a thread that handles fetch request has to finish reading the shuffle blocks and writing the response to channel, before handling the next requests, although the channel writing is done by another thread pool. Previously it's fully async: the thread can handle the next request once it finishes reading the shuffle blocks of the current request. That said, the throughput of handling fetch requests is reduced now.

Anyway let me come up with a microbenchmark first.

@cloud-fan
Copy link
Contributor

Unfortunately, I'm not able to minimize our internal workload, so I switch to TPCDS to show the perf regression.

data: TPCDS table store_sales with scale factor 99. It's 3.5GB, 1233 files
query: sql("select count(distinct ss_list_price) from store_sales where ss_quantity == 5").show
spark: latest master, "local-cluster[2, 4, 19968]"
env: m4-4xlarge

Since it's too many changes to revert this commit, I simply remove the await in ChunkFetchRequestHandler, which effectively reverts this feature.

With await removed, the query runs 4% faster, which is not much. But if you look at the web UI and check the task metrics, shuffle read time is significantly reduced if we remove await.

The master branch:
image
and the second stage
image

With await removed:
image
and the second stage
image

The shuffle read is about 3x faster with await removed.

@tgravescs
Copy link
Contributor

the jobs we ran didn't see an overall performance impact and it greatly helped when you ran on busy multi-tenant cluster.

How about if the value of spark.shuffle.server.chunkFetchHandlerThreadsPercent isn't explicitly set by the user we just don't add the extra event loop or thread pool, see TransportContext - just like it does it its not shuffle, that should be a fairly easy change.

@cloud-fan
Copy link
Contributor

Sounds good to me. We can explore different solutions after 3.0.

@otterc
Copy link
Contributor

otterc commented Jan 24, 2020

I think await does't provide any benefit and could be removed.
When the chunk fetch event loop runs

channel.writeAndFlush(result)

This adds a WriteAndFlushTask in the pendingQueue of the default server-IO thread registered with that channel.

The code in NioEventLoop.run() itself throttles the number of tasks that can be run at a time from its pending queue.
Here is the code:

                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }

Here it records how much time it took to perform the IO operations, that is, execute processSelectedKeys(). runAllTasks, which is the method that processes the tasks from pendingQueue, will be performed for the same amount of time.

runAllTasks() does process 64 tasks and then checks the time.

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            } 

This ensures that the default server-IO thread always gets time to process the ready channels. Its not always busy processing WriteAndFlushTask

@Victsm
Copy link
Contributor

Victsm commented Jan 24, 2020

When we worked on the original fix in #21402, we were holding a wrong view of how Netty handles both event loop groups.
We thought both the chunk fetch requests and the control plane RPCs are going to become tasks placed in the task queues of threads inside the server I/O event loop group.
This was the reason we used sync or await in the patch, so that we are able to limit the # WriteAndFlushTask placed by chunk fetch requests in the task queues of threads in server I/O event loop group, which would allow the control plane RPCs to be picked up by the I/O threads in time to avoid timeout.
More recently, when working in SPARK-30512, we get a closer and much more correct view of how Netty handles these 2 event loop groups (the chunk fetch request dedicated one and the I/O event loop group).
As mentioned by @otterc, Netty should have already processed the data plane RPCs and control plane RPCs separately after we set up the dedicated event loop for chunk fetch requests.
Thus, we don't really need to throttle the # tasks placed by chunk fetch requests in the task queues of threads in I/O event loop group.

We have an internal stress testing framework that can help to validate if the original timeout issue still occurs after removing await.
We will do the test to validate things are working as expected.
If so, we may take the benefit of this fix without generating the performance regression.

@otterc
Copy link
Contributor

otterc commented Jan 24, 2020

@Victsm @tgravescs
I removed the await and tested with our internal stress testing framework. I started seeing SASL requests timing out. In this test, I observed more than 2 minutes delay between channel registration and when the first bytes are read from the channel.

2020-01-24 22:53:34,019 DEBUG org.spark_project.io.netty.handler.logging.LoggingHandler: [id: 0xd475f5ff, L:/10.150.16.27:7337 - R:/10.150.16.44:11388] REGISTERED
2020-01-24 22:53:34,019 DEBUG org.spark_project.io.netty.handler.logging.LoggingHandler: [id: 0xd475f5ff, L:/10.150.16.27:7337 - R:/10.150.16.44:11388] ACTIVE

2020-01-24 22:55:05,207 DEBUG org.spark_project.io.netty.handler.logging.LoggingHandler: [id: 0xd475f5ff, L:/10.150.16.27:7337 - R:/10.150.16.44:11388] READ: 48B
2020-01-24 22:55:05,207 DEBUG org.spark_project.io.netty.handler.logging.LoggingHandler: [id: 0xd475f5ff, L:/10.150.16.27:7337 - R:/10.150.16.44:11388] WRITE: org.apache.spark.network.protocol.MessageWithHeader@27e59ee9
2020-01-24 22:55:05,207 DEBUG org.spark_project.io.netty.handler.logging.LoggingHandler: [id: 0xd475f5ff, L:/10.150.16.27:7337 - R:/10.150.16.44:11388] FLUSH
2020-01-24 22:55:05,207 INFO org.apache.spark.network.server.OutgoingChannelHandler: OUTPUT request 5929104419960968526 channel d475f5ff request_rec 1579906505207 transport_rec 1579906505207 flush 1579906505207  receive-transport 0 transport-flush 0 total 0

Since there is a delay in reading the channel, I suspect this is because the hardcoding in netty code
SingleThreadEventExecutor.runAllTask() that checks time only after 64 tasks. WriteAndFlush tasks are bulky tasks. With await there will be just 1 WriteAndFlushTask per channel in the IO thread's pending queue and the rest of the tasks will be smaller tasks.
However, without await there are more WriteAndFlush tasks per channel in the IO thread's queue. Since it processes 64 tasks and then checks time, this time increases with more WriteAndFlush tasks.

/ Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

I can test this theory by lowering this number in a fork of netty and building spark against it. However, for now we can't remove await().

Note: This test was with a dedicated boss event loop group which is why we don't see any delay in channel registration.

@Victsm
Copy link
Contributor

Victsm commented Jan 25, 2020

@cloud-fan
What do you think of SPARK-30602 in the context of this perf regression you see?
We have also been operating our Spark infrastructure with this change for quite some time, and we do not in general notice performance regressions.
When doing shuffle in a large-scale multi-tenancy cluster, the issues we mentioned in SPARK-30602's SPIP doc becomes much more dominant.
Without the change in SPARK-24355, before saturating the underlying network, the disk is first saturated due to the small random reads, which will then further propagate its impact to start timing out control plane RPCs.
SPARK-24355 is basically an attempt to stop the small random reads impacting control plane RPCs to improve reliability of shuffle service.
On top of these, SPARK-30602 will significantly improve the overall throughput and efficiency of Spark shuffle.

@cloud-fan
Copy link
Contributor

SPARK-30602 looks too big for this particular issue.

What I am looking for is to disable this feature completely by default (no await). We can further improve it that brings no perf regression, or replace it by SPARK-30602.

@xuanyuanking can you help to do it?

@xuanyuanking
Copy link
Member

Sure, will give a follow up for this.

cloud-fan pushed a commit that referenced this pull request Mar 26, 2020
…event loop group

### What changes were proposed in this pull request?
Fix the regression caused by #22173.
The original PR changes the logic of handling `ChunkFetchReqeust` from async to sync, that's causes the shuffle benchmark regression. This PR fixes the regression back to the async mode by reusing the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent`.
When the user sets the config, ChunkFetchReqeust will be processed in a separate event loop group, otherwise, the code path is exactly the same as before.

### Why are the changes needed?
Fix the shuffle performance regression described in  #22173 (comment)

### Does this PR introduce any user-facing change?
Yes, this PR disable the separate event loop for FetchRequest by default.

### How was this patch tested?
Existing UT.

Closes #27665 from xuanyuanking/SPARK-24355-follow.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Mar 26, 2020
…event loop group

### What changes were proposed in this pull request?
Fix the regression caused by #22173.
The original PR changes the logic of handling `ChunkFetchReqeust` from async to sync, that's causes the shuffle benchmark regression. This PR fixes the regression back to the async mode by reusing the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent`.
When the user sets the config, ChunkFetchReqeust will be processed in a separate event loop group, otherwise, the code path is exactly the same as before.

### Why are the changes needed?
Fix the shuffle performance regression described in  #22173 (comment)

### Does this PR introduce any user-facing change?
Yes, this PR disable the separate event loop for FetchRequest by default.

### How was this patch tested?
Existing UT.

Closes #27665 from xuanyuanking/SPARK-24355-follow.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 0fe203e)
Signed-off-by: Wenchen Fan <[email protected]>
XUJiahua pushed a commit to XUJiahua/spark that referenced this pull request Apr 9, 2020
…dle block fetch requests.

Description:
Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
In order to process a client request, it would require one available server netty handler thread.
However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.
(Please fill in changes proposed in this fix)

For Original PR please refer here
apache#21402

Unit tests and stress testing.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#22173 from redsanket/SPARK-24335.

Authored-by: Sanket Chintapalli <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
(cherry picked from commit ff601cf)
(cherry picked from commit 8094f59a8128166dd283d97c49322ec09fa2c93c)

Change-Id: Id91ae9d20ffe9aae09585b197c6a15f4e8044895
(cherry picked from commit 5261726732ef5a9cf21e02a25bba0d14a808c274)

Change-Id: Ic666004b8b3d17038d8f42f4154058c0d00e9a37
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…event loop group

### What changes were proposed in this pull request?
Fix the regression caused by apache#22173.
The original PR changes the logic of handling `ChunkFetchReqeust` from async to sync, that's causes the shuffle benchmark regression. This PR fixes the regression back to the async mode by reusing the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent`.
When the user sets the config, ChunkFetchReqeust will be processed in a separate event loop group, otherwise, the code path is exactly the same as before.

### Why are the changes needed?
Fix the shuffle performance regression described in  apache#22173 (comment)

### Does this PR introduce any user-facing change?
Yes, this PR disable the separate event loop for FetchRequest by default.

### How was this patch tested?
Existing UT.

Closes apache#27665 from xuanyuanking/SPARK-24355-follow.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants