Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public TransportContext(

if (conf.getModuleName() != null &&
conf.getModuleName().equalsIgnoreCase("shuffle") &&
!isClientOnly) {
!isClientOnly && conf.separateChunkFetchRequest()) {
chunkFetchWorkers = NettyUtils.createEventLoop(
IOMode.valueOf(conf.ioMode()),
conf.chunkFetchHandlerThreads(),
Expand Down Expand Up @@ -187,8 +187,6 @@ public TransportChannelHandler initializePipeline(
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
ChunkFetchRequestHandler chunkFetchHandler =
createChunkFetchHandler(channelHandler, channelRpcHandler);
ChannelPipeline pipeline = channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
Expand All @@ -200,6 +198,9 @@ public TransportChannelHandler initializePipeline(
.addLast("handler", channelHandler);
// Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs.
if (chunkFetchWorkers != null) {
ChunkFetchRequestHandler chunkFetchHandler = new ChunkFetchRequestHandler(
channelHandler.getClient(), rpcHandler.getStreamManager(),
conf.maxChunksBeingTransferred(), true /* syncModeEnabled */);
pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler);
}
return channelHandler;
Expand All @@ -217,19 +218,17 @@ public TransportChannelHandler initializePipeline(
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
ChunkFetchRequestHandler chunkFetchRequestHandler = null;
if (!separateChunkFetchRequest) {
chunkFetchRequestHandler = new ChunkFetchRequestHandler(
client, rpcHandler.getStreamManager(),
conf.maxChunksBeingTransferred(), false /* syncModeEnabled */);
}
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections, this);
}

/**
* Creates the dedicated ChannelHandler for ChunkFetchRequest messages.
*/
private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler channelHandler,
RpcHandler rpcHandler) {
return new ChunkFetchRequestHandler(channelHandler.getClient(),
rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred());
conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
}

public TransportConf getConf() { return conf; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,17 @@ public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkF
private final StreamManager streamManager;
/** The max number of chunks being transferred and not finished yet. */
private final long maxChunksBeingTransferred;
private final boolean syncModeEnabled;

public ChunkFetchRequestHandler(
TransportClient client,
StreamManager streamManager,
Long maxChunksBeingTransferred) {
Long maxChunksBeingTransferred,
boolean syncModeEnabled) {
this.client = client;
this.streamManager = streamManager;
this.maxChunksBeingTransferred = maxChunksBeingTransferred;
this.syncModeEnabled = syncModeEnabled;
}

@Override
Expand All @@ -76,6 +79,11 @@ protected void channelRead0(
ChannelHandlerContext ctx,
final ChunkFetchRequest msg) throws Exception {
Channel channel = ctx.channel();
processFetchRequest(channel, msg);
}

public void processFetchRequest(
final Channel channel, final ChunkFetchRequest msg) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
msg.streamChunkId);
Expand Down Expand Up @@ -112,19 +120,26 @@ protected void channelRead0(
* channel will be handled by the EventLoop the channel is registered to. So even
* though we are processing the ChunkFetchRequest in a separate thread pool, the actual I/O,
* which is the potentially blocking call that could deplete server handler threads, is still
* being processed by TransportServer's default EventLoopGroup. In order to throttle the max
* number of threads that channel I/O for sending response to ChunkFetchRequest, the thread
* calling channel.writeAndFlush will wait for the completion of sending response back to
* client by invoking await(). This will throttle the rate at which threads from
* ChunkFetchRequest dedicated EventLoopGroup submit channel I/O requests to TransportServer's
* default EventLoopGroup, thus making sure that we can reserve some threads in
* TransportServer's default EventLoopGroup for handling other RPC messages.
* being processed by TransportServer's default EventLoopGroup.
*
* When syncModeEnabled is true, Spark will throttle the max number of threads that channel I/O
* for sending response to ChunkFetchRequest, the thread calling channel.writeAndFlush will wait
* for the completion of sending response back to client by invoking await(). This will throttle
* the rate at which threads from ChunkFetchRequest dedicated EventLoopGroup submit channel I/O
* requests to TransportServer's default EventLoopGroup, thus making sure that we can reserve
* some threads in TransportServer's default EventLoopGroup for handling other RPC messages.
*/
private ChannelFuture respond(
final Channel channel,
final Encodable result) throws InterruptedException {
final SocketAddress remoteAddress = channel.remoteAddress();
return channel.writeAndFlush(result).await().addListener((ChannelFutureListener) future -> {
ChannelFuture channelFuture;
if (syncModeEnabled) {
channelFuture = channel.writeAndFlush(result).await();
} else {
channelFuture = channel.writeAndFlush(result);
}
return channelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,22 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
private final TransportRequestHandler requestHandler;
private final long requestTimeoutNs;
private final boolean closeIdleConnections;
private final boolean skipChunkFetchRequest;
private final TransportContext transportContext;

public TransportChannelHandler(
TransportClient client,
TransportResponseHandler responseHandler,
TransportRequestHandler requestHandler,
long requestTimeoutMs,
boolean skipChunkFetchRequest,
boolean closeIdleConnections,
TransportContext transportContext) {
this.client = client;
this.responseHandler = responseHandler;
this.requestHandler = requestHandler;
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
this.skipChunkFetchRequest = skipChunkFetchRequest;
this.closeIdleConnections = closeIdleConnections;
this.transportContext = transportContext;
}
Expand Down Expand Up @@ -124,7 +127,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
*/
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
if (msg instanceof ChunkFetchRequest) {
if (skipChunkFetchRequest && msg instanceof ChunkFetchRequest) {
return false;
} else {
return super.acceptInboundMessage(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,21 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
/** The max number of chunks being transferred and not finished yet. */
private final long maxChunksBeingTransferred;

/** The dedicated ChannelHandler for ChunkFetchRequest messages. */
private final ChunkFetchRequestHandler chunkFetchRequestHandler;

public TransportRequestHandler(
Channel channel,
TransportClient reverseClient,
RpcHandler rpcHandler,
Long maxChunksBeingTransferred) {
Long maxChunksBeingTransferred,
ChunkFetchRequestHandler chunkFetchRequestHandler) {
this.channel = channel;
this.reverseClient = reverseClient;
this.rpcHandler = rpcHandler;
this.streamManager = rpcHandler.getStreamManager();
this.maxChunksBeingTransferred = maxChunksBeingTransferred;
this.chunkFetchRequestHandler = chunkFetchRequestHandler;
}

@Override
Expand All @@ -97,8 +102,10 @@ public void channelInactive() {
}

@Override
public void handle(RequestMessage request) {
if (request instanceof RpcRequest) {
public void handle(RequestMessage request) throws Exception {
if (request instanceof ChunkFetchRequest) {
chunkFetchRequestHandler.processFetchRequest(channel, (ChunkFetchRequest) request);
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ public long maxChunksBeingTransferred() {

/**
* Percentage of io.serverThreads used by netty to process ChunkFetchRequest.
* Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages.
* When the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is set,
* shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages.
* Although when calling the async writeAndFlush on the underlying channel to send
* response back to client, the I/O on the channel is still being handled by
* {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup
Expand All @@ -339,12 +340,20 @@ public int chunkFetchHandlerThreads() {
return 0;
}
int chunkFetchHandlerThreadsPercent =
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's wrong with the previous code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to give a default value here, when it comes to here, the config must be set.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Mar 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by the config must be set, @xuanyuanking ? What value do you expect by default? Apparently, this seems to revert SPARK-25641 together without mentioning SPARK-25641. In the PR, only SPARK-24355 is mentioned.

No need to give a default value here, when it comes to here, the config must be set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we only call this method if separateChunkFetchRequest returns true.

We will see exception if the assumption is broken.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, we make the separate event loop group configurable by checking the config spark.shuffle.server.chunkFetchHandlerThreadsPercent is set or not.

What do you mean by the config must be set

Here the function chunkFetchHandlerThreads is only called while the config is set.

if (conf.getModuleName() != null &&
conf.getModuleName().equalsIgnoreCase("shuffle") &&
!isClientOnly && conf.separateChunkFetchRequest()) {
chunkFetchWorkers = NettyUtils.createEventLoop(
IOMode.valueOf(conf.ioMode()),
conf.chunkFetchHandlerThreads(),

What value do you expect by default? Apparently, this seems to revert SPARK-25641 together without mentioning SPARK-25641.

Yes, this PR makes the feature disabled by default, let me also mention SPARK-25641 in PR description.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @xuanyuanking and @cloud-fan .

Integer.parseInt(conf.get("spark.shuffle.server.chunkFetchHandlerThreadsPercent"));
int threads =
this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors();
return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 100.0));
}

/**
* Whether to use a separate EventLoopGroup to process ChunkFetchRequest messages, it is decided
* by the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is set or not.
*/
public boolean separateChunkFetchRequest() {
return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0) > 0;
}

/**
* Whether to use the old protocol while doing the shuffle block fetching.
* It is only enabled while we need the compatibility in the scenario of new spark version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;

import io.netty.channel.Channel;
import org.apache.spark.network.server.ChunkFetchRequestHandler;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -33,6 +32,7 @@
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.*;
import org.apache.spark.network.server.ChunkFetchRequestHandler;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
Expand Down Expand Up @@ -68,7 +68,7 @@ public void handleChunkFetchRequest() throws Exception {
long streamId = streamManager.registerStream("test-app", managedBuffers.iterator(), channel);
TransportClient reverseClient = mock(TransportClient.class);
ChunkFetchRequestHandler requestHandler = new ChunkFetchRequestHandler(reverseClient,
rpcHandler.getStreamManager(), 2L);
rpcHandler.getStreamManager(), 2L, false);

RequestMessage request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0));
requestHandler.channelRead(context, request0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class TransportRequestHandlerSuite {

@Test
public void handleStreamRequest() {
public void handleStreamRequest() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RpcHandler rpcHandler = new NoOpRpcHandler();
OneForOneStreamManager streamManager = (OneForOneStreamManager) (rpcHandler.getStreamManager());
Channel channel = mock(Channel.class);
Expand All @@ -66,7 +66,7 @@ public void handleStreamRequest() {

TransportClient reverseClient = mock(TransportClient.class);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient,
rpcHandler, 2L);
rpcHandler, 2L, null);

RequestMessage request0 = new StreamRequest(String.format("%d_%d", streamId, 0));
requestHandler.handle(request0);
Expand Down