From d4e0352ba54f5e05aae93b1e19183988e2d9f7e6 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 24 Mar 2020 19:36:19 +0800 Subject: [PATCH 1/3] simplify code --- .../spark/network/TransportContext.java | 29 ++++++++-------- .../server/ChunkFetchRequestHandler.java | 33 ++++++++++++++----- .../server/TransportChannelHandler.java | 5 ++- .../server/TransportRequestHandler.java | 13 ++++++-- .../spark/network/util/TransportConf.java | 19 +++++++++-- .../ChunkFetchRequestHandlerSuite.java | 4 +-- .../network/TransportRequestHandlerSuite.java | 4 +-- 7 files changed, 74 insertions(+), 33 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index d99b9bdbce39..73b78520f9b5 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -123,7 +123,7 @@ public TransportContext( if (conf.getModuleName() != null && conf.getModuleName().equalsIgnoreCase("shuffle") && - !isClientOnly) { + !isClientOnly && conf.separateChunkFetchRequest()) { chunkFetchWorkers = NettyUtils.createEventLoop( IOMode.valueOf(conf.ioMode()), conf.chunkFetchHandlerThreads(), @@ -187,8 +187,6 @@ public TransportChannelHandler initializePipeline( RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); - ChunkFetchRequestHandler chunkFetchHandler = - createChunkFetchHandler(channelHandler, channelRpcHandler); ChannelPipeline pipeline = channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) @@ -200,6 +198,9 @@ public TransportChannelHandler initializePipeline( .addLast("handler", channelHandler); // Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs. if (chunkFetchWorkers != null) { + ChunkFetchRequestHandler chunkFetchHandler = new ChunkFetchRequestHandler( + channelHandler.getClient(), rpcHandler.getStreamManager(), + conf.maxChunksBeingTransferred(), true /* syncModeEnabled */); pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); } return channelHandler; @@ -217,19 +218,19 @@ public TransportChannelHandler initializePipeline( private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) { TransportResponseHandler responseHandler = new TransportResponseHandler(channel); TransportClient client = new TransportClient(channel, responseHandler); + boolean separateChunkFetchRequest = conf.separateChunkFetchRequest(); + ChunkFetchRequestHandler chunkFetchRequestHandler; + if (!separateChunkFetchRequest) { + chunkFetchRequestHandler = new ChunkFetchRequestHandler( + client, rpcHandler.getStreamManager(), + conf.maxChunksBeingTransferred(), false /* syncModeEnabled */); + } else { + chunkFetchRequestHandler = null; + } TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, - rpcHandler, conf.maxChunksBeingTransferred()); + rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler); return new TransportChannelHandler(client, responseHandler, requestHandler, - conf.connectionTimeoutMs(), closeIdleConnections, this); - } - - /** - * Creates the dedicated ChannelHandler for ChunkFetchRequest messages. - */ - private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler channelHandler, - RpcHandler rpcHandler) { - return new ChunkFetchRequestHandler(channelHandler.getClient(), - rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred()); + conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this); } public TransportConf getConf() { return conf; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index 94412c4db559..82810dacdad8 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -55,14 +55,17 @@ public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + ChannelFuture channelFuture; + if (syncModeEnabled) { + channelFuture = channel.writeAndFlush(result).await(); + } else { + channelFuture = channel.writeAndFlush(result); + } + return channelFuture.addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { logger.trace("Sent result {} to client {}", result, remoteAddress); } else { diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index 31371f6970ff..e53a0c1a0852 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -58,6 +58,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler { /** The max number of chunks being transferred and not finished yet. */ private final long maxChunksBeingTransferred; + /** The dedicated ChannelHandler for ChunkFetchRequest messages. */ + private final ChunkFetchRequestHandler chunkFetchRequestHandler; + public TransportRequestHandler( Channel channel, TransportClient reverseClient, RpcHandler rpcHandler, - Long maxChunksBeingTransferred) { + Long maxChunksBeingTransferred, + ChunkFetchRequestHandler chunkFetchRequestHandler) { this.channel = channel; this.reverseClient = reverseClient; this.rpcHandler = rpcHandler; this.streamManager = rpcHandler.getStreamManager(); this.maxChunksBeingTransferred = maxChunksBeingTransferred; + this.chunkFetchRequestHandler = chunkFetchRequestHandler; } @Override @@ -97,8 +102,10 @@ public void channelInactive() { } @Override - public void handle(RequestMessage request) { - if (request instanceof RpcRequest) { + public void handle(RequestMessage request) throws Exception { + if (request instanceof ChunkFetchRequest) { + chunkFetchRequestHandler.processFetchRequest(channel, (ChunkFetchRequest) request); + } else if (request instanceof RpcRequest) { processRpcRequest((RpcRequest) request); } else if (request instanceof OneWayMessage) { processOneWayMessage((OneWayMessage) request); diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index cc0f2919568a..f0500d0053ea 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -18,6 +18,7 @@ package org.apache.spark.network.util; import java.util.Locale; +import java.util.NoSuchElementException; import java.util.Properties; import com.google.common.primitives.Ints; @@ -316,7 +317,8 @@ public long maxChunksBeingTransferred() { /** * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. - * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * When the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is set, + * shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. * Although when calling the async writeAndFlush on the underlying channel to send * response back to client, the I/O on the channel is still being handled by * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup @@ -339,12 +341,25 @@ public int chunkFetchHandlerThreads() { return 0; } int chunkFetchHandlerThreadsPercent = - conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 100); + Integer.parseInt(conf.get("spark.shuffle.server.chunkFetchHandlerThreadsPercent")); int threads = this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors(); return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 100.0)); } + /** + * Whether to use a separate EventLoopGroup to process ChunkFetchRequest messages, it is decided + * by the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is set or not. + */ + public boolean separateChunkFetchRequest() { + try { + conf.get("spark.shuffle.server.chunkFetchHandlerThreadsPercent"); + return true; + } catch (NoSuchElementException e) { + return false; + } + } + /** * Whether to use the old protocol while doing the shuffle block fetching. * It is only enabled while we need the compatibility in the scenario of new spark version diff --git a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java index 7e30ed4048ca..addb4ff33274 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java @@ -22,7 +22,6 @@ import java.util.List; import io.netty.channel.Channel; -import org.apache.spark.network.server.ChunkFetchRequestHandler; import org.junit.Assert; import org.junit.Test; @@ -33,6 +32,7 @@ import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.protocol.*; +import org.apache.spark.network.server.ChunkFetchRequestHandler; import org.apache.spark.network.server.NoOpRpcHandler; import org.apache.spark.network.server.OneForOneStreamManager; import org.apache.spark.network.server.RpcHandler; @@ -68,7 +68,7 @@ public void handleChunkFetchRequest() throws Exception { long streamId = streamManager.registerStream("test-app", managedBuffers.iterator(), channel); TransportClient reverseClient = mock(TransportClient.class); ChunkFetchRequestHandler requestHandler = new ChunkFetchRequestHandler(reverseClient, - rpcHandler.getStreamManager(), 2L); + rpcHandler.getStreamManager(), 2L, false); RequestMessage request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0)); requestHandler.channelRead(context, request0); diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java index a43a65904868..0a6447176237 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java @@ -39,7 +39,7 @@ public class TransportRequestHandlerSuite { @Test - public void handleStreamRequest() { + public void handleStreamRequest() throws Exception { RpcHandler rpcHandler = new NoOpRpcHandler(); OneForOneStreamManager streamManager = (OneForOneStreamManager) (rpcHandler.getStreamManager()); Channel channel = mock(Channel.class); @@ -66,7 +66,7 @@ public void handleStreamRequest() { TransportClient reverseClient = mock(TransportClient.class); TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient, - rpcHandler, 2L); + rpcHandler, 2L, null); RequestMessage request0 = new StreamRequest(String.format("%d_%d", streamId, 0)); requestHandler.handle(request0); From 34b52ef36420ecdbc9f70ee3dc36a3e72b29de1f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 24 Mar 2020 19:39:10 +0800 Subject: [PATCH 2/3] . --- .../main/java/org/apache/spark/network/TransportContext.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 73b78520f9b5..a0de9df1986f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -219,13 +219,11 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler TransportResponseHandler responseHandler = new TransportResponseHandler(channel); TransportClient client = new TransportClient(channel, responseHandler); boolean separateChunkFetchRequest = conf.separateChunkFetchRequest(); - ChunkFetchRequestHandler chunkFetchRequestHandler; + ChunkFetchRequestHandler chunkFetchRequestHandler = null; if (!separateChunkFetchRequest) { chunkFetchRequestHandler = new ChunkFetchRequestHandler( client, rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred(), false /* syncModeEnabled */); - } else { - chunkFetchRequestHandler = null; } TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler); From 4f42083b43058d8f7276e5f095dc4b432c8b2b71 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 25 Mar 2020 13:46:14 +0800 Subject: [PATCH 3/3] address comment --- .../java/org/apache/spark/network/util/TransportConf.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index f0500d0053ea..6c37f9a38237 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -18,7 +18,6 @@ package org.apache.spark.network.util; import java.util.Locale; -import java.util.NoSuchElementException; import java.util.Properties; import com.google.common.primitives.Ints; @@ -352,12 +351,7 @@ public int chunkFetchHandlerThreads() { * by the config `spark.shuffle.server.chunkFetchHandlerThreadsPercent` is set or not. */ public boolean separateChunkFetchRequest() { - try { - conf.get("spark.shuffle.server.chunkFetchHandlerThreadsPercent"); - return true; - } catch (NoSuchElementException e) { - return false; - } + return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0) > 0; } /**