Skip to content

Commit 1f492ee

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-25483
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UnsafeArrayDataBenchmark.scala
2 parents 5bb5f08 + 4a11209 commit 1f492ee

File tree

104 files changed

+1607
-1129
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

104 files changed

+1607
-1129
lines changed

common/network-common/src/main/java/org/apache/spark/network/TransportContext.java

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.List;
2222

2323
import io.netty.channel.Channel;
24+
import io.netty.channel.ChannelPipeline;
25+
import io.netty.channel.EventLoopGroup;
2426
import io.netty.channel.socket.SocketChannel;
2527
import io.netty.handler.timeout.IdleStateHandler;
2628
import org.slf4j.Logger;
@@ -32,11 +34,13 @@
3234
import org.apache.spark.network.client.TransportResponseHandler;
3335
import org.apache.spark.network.protocol.MessageDecoder;
3436
import org.apache.spark.network.protocol.MessageEncoder;
37+
import org.apache.spark.network.server.ChunkFetchRequestHandler;
3538
import org.apache.spark.network.server.RpcHandler;
3639
import org.apache.spark.network.server.TransportChannelHandler;
3740
import org.apache.spark.network.server.TransportRequestHandler;
3841
import org.apache.spark.network.server.TransportServer;
3942
import org.apache.spark.network.server.TransportServerBootstrap;
43+
import org.apache.spark.network.util.IOMode;
4044
import org.apache.spark.network.util.NettyUtils;
4145
import org.apache.spark.network.util.TransportConf;
4246
import org.apache.spark.network.util.TransportFrameDecoder;
@@ -61,6 +65,7 @@ public class TransportContext {
6165
private final TransportConf conf;
6266
private final RpcHandler rpcHandler;
6367
private final boolean closeIdleConnections;
68+
private final boolean isClientOnly;
6469

6570
/**
6671
* Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created
@@ -77,17 +82,54 @@ public class TransportContext {
7782
private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
7883
private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;
7984

85+
// Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling
86+
// max number of TransportServer worker threads that are blocked on writing response
87+
// of ChunkFetchRequest message back to the client via the underlying channel.
88+
private static EventLoopGroup chunkFetchWorkers;
89+
8090
public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
81-
this(conf, rpcHandler, false);
91+
this(conf, rpcHandler, false, false);
8292
}
8393

8494
public TransportContext(
8595
TransportConf conf,
8696
RpcHandler rpcHandler,
8797
boolean closeIdleConnections) {
98+
this(conf, rpcHandler, closeIdleConnections, false);
99+
}
100+
101+
/**
102+
* Enables TransportContext initialization for underlying client and server.
103+
*
104+
* @param conf TransportConf
105+
* @param rpcHandler RpcHandler responsible for handling requests and responses.
106+
* @param closeIdleConnections Close idle connections if it is set to true.
107+
* @param isClientOnly This config indicates the TransportContext is only used by a client.
108+
* This config is more important when external shuffle is enabled.
109+
* It stops creating extra event loop and subsequent thread pool
110+
* for shuffle clients to handle chunked fetch requests.
111+
*/
112+
public TransportContext(
113+
TransportConf conf,
114+
RpcHandler rpcHandler,
115+
boolean closeIdleConnections,
116+
boolean isClientOnly) {
88117
this.conf = conf;
89118
this.rpcHandler = rpcHandler;
90119
this.closeIdleConnections = closeIdleConnections;
120+
this.isClientOnly = isClientOnly;
121+
122+
synchronized(TransportContext.class) {
123+
if (chunkFetchWorkers == null &&
124+
conf.getModuleName() != null &&
125+
conf.getModuleName().equalsIgnoreCase("shuffle") &&
126+
!isClientOnly) {
127+
chunkFetchWorkers = NettyUtils.createEventLoop(
128+
IOMode.valueOf(conf.ioMode()),
129+
conf.chunkFetchHandlerThreads(),
130+
"shuffle-chunk-fetch-handler");
131+
}
132+
}
91133
}
92134

93135
/**
@@ -144,14 +186,23 @@ public TransportChannelHandler initializePipeline(
144186
RpcHandler channelRpcHandler) {
145187
try {
146188
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
147-
channel.pipeline()
189+
ChunkFetchRequestHandler chunkFetchHandler =
190+
createChunkFetchHandler(channelHandler, channelRpcHandler);
191+
ChannelPipeline pipeline = channel.pipeline()
148192
.addLast("encoder", ENCODER)
149193
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
150194
.addLast("decoder", DECODER)
151-
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
195+
.addLast("idleStateHandler",
196+
new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
152197
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
153198
// would require more logic to guarantee if this were not part of the same event loop.
154199
.addLast("handler", channelHandler);
200+
// Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs.
201+
if (conf.getModuleName() != null &&
202+
conf.getModuleName().equalsIgnoreCase("shuffle")
203+
&& !isClientOnly) {
204+
pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler);
205+
}
155206
return channelHandler;
156207
} catch (RuntimeException e) {
157208
logger.error("Error while initializing Netty pipeline", e);
@@ -173,5 +224,14 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler
173224
conf.connectionTimeoutMs(), closeIdleConnections);
174225
}
175226

227+
/**
228+
* Creates the dedicated ChannelHandler for ChunkFetchRequest messages.
229+
*/
230+
private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler channelHandler,
231+
RpcHandler rpcHandler) {
232+
return new ChunkFetchRequestHandler(channelHandler.getClient(),
233+
rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred());
234+
}
235+
176236
public TransportConf getConf() { return conf; }
177237
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.server;
19+
20+
import java.net.SocketAddress;
21+
22+
import com.google.common.base.Throwables;
23+
import io.netty.channel.Channel;
24+
import io.netty.channel.ChannelFuture;
25+
import io.netty.channel.ChannelFutureListener;
26+
import io.netty.channel.ChannelHandlerContext;
27+
import io.netty.channel.SimpleChannelInboundHandler;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import org.apache.spark.network.buffer.ManagedBuffer;
32+
import org.apache.spark.network.client.TransportClient;
33+
import org.apache.spark.network.protocol.ChunkFetchFailure;
34+
import org.apache.spark.network.protocol.ChunkFetchRequest;
35+
import org.apache.spark.network.protocol.ChunkFetchSuccess;
36+
import org.apache.spark.network.protocol.Encodable;
37+
38+
import static org.apache.spark.network.util.NettyUtils.*;
39+
40+
/**
41+
* A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response
42+
* of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying
43+
* channel could potentially be blocked due to disk contentions. If several hundreds of clients
44+
* send ChunkFetchRequest to the server at the same time, it could potentially occupying all
45+
* threads from TransportServer's default EventLoopGroup for waiting for disk reads before it
46+
* can send the block data back to the client as part of the ChunkFetchSuccess messages. As a
47+
* result, it would leave no threads left to process other RPC messages, which takes much less
48+
* time to process, and could lead to client timing out on either performing SASL authentication,
49+
* registering executors, or waiting for response for an OpenBlocks messages.
50+
*/
51+
public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler<ChunkFetchRequest> {
52+
private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class);
53+
54+
private final TransportClient client;
55+
private final StreamManager streamManager;
56+
/** The max number of chunks being transferred and not finished yet. */
57+
private final long maxChunksBeingTransferred;
58+
59+
public ChunkFetchRequestHandler(
60+
TransportClient client,
61+
StreamManager streamManager,
62+
Long maxChunksBeingTransferred) {
63+
this.client = client;
64+
this.streamManager = streamManager;
65+
this.maxChunksBeingTransferred = maxChunksBeingTransferred;
66+
}
67+
68+
@Override
69+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
70+
logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), cause);
71+
ctx.close();
72+
}
73+
74+
@Override
75+
protected void channelRead0(
76+
ChannelHandlerContext ctx,
77+
final ChunkFetchRequest msg) throws Exception {
78+
Channel channel = ctx.channel();
79+
if (logger.isTraceEnabled()) {
80+
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
81+
msg.streamChunkId);
82+
}
83+
long chunksBeingTransferred = streamManager.chunksBeingTransferred();
84+
if (chunksBeingTransferred >= maxChunksBeingTransferred) {
85+
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
86+
chunksBeingTransferred, maxChunksBeingTransferred);
87+
channel.close();
88+
return;
89+
}
90+
ManagedBuffer buf;
91+
try {
92+
streamManager.checkAuthorization(client, msg.streamChunkId.streamId);
93+
streamManager.registerChannel(channel, msg.streamChunkId.streamId);
94+
buf = streamManager.getChunk(msg.streamChunkId.streamId, msg.streamChunkId.chunkIndex);
95+
} catch (Exception e) {
96+
logger.error(String.format("Error opening block %s for request from %s",
97+
msg.streamChunkId, getRemoteAddress(channel)), e);
98+
respond(channel, new ChunkFetchFailure(msg.streamChunkId,
99+
Throwables.getStackTraceAsString(e)));
100+
return;
101+
}
102+
103+
streamManager.chunkBeingSent(msg.streamChunkId.streamId);
104+
respond(channel, new ChunkFetchSuccess(msg.streamChunkId, buf)).addListener(
105+
(ChannelFutureListener) future -> streamManager.chunkSent(msg.streamChunkId.streamId));
106+
}
107+
108+
/**
109+
* The invocation to channel.writeAndFlush is async, and the actual I/O on the
110+
* channel will be handled by the EventLoop the channel is registered to. So even
111+
* though we are processing the ChunkFetchRequest in a separate thread pool, the actual I/O,
112+
* which is the potentially blocking call that could deplete server handler threads, is still
113+
* being processed by TransportServer's default EventLoopGroup. In order to throttle the max
114+
* number of threads that channel I/O for sending response to ChunkFetchRequest, the thread
115+
* calling channel.writeAndFlush will wait for the completion of sending response back to
116+
* client by invoking await(). This will throttle the rate at which threads from
117+
* ChunkFetchRequest dedicated EventLoopGroup submit channel I/O requests to TransportServer's
118+
* default EventLoopGroup, thus making sure that we can reserve some threads in
119+
* TransportServer's default EventLoopGroup for handling other RPC messages.
120+
*/
121+
private ChannelFuture respond(
122+
final Channel channel,
123+
final Encodable result) throws InterruptedException {
124+
final SocketAddress remoteAddress = channel.remoteAddress();
125+
return channel.writeAndFlush(result).await().addListener((ChannelFutureListener) future -> {
126+
if (future.isSuccess()) {
127+
logger.trace("Sent result {} to client {}", result, remoteAddress);
128+
} else {
129+
logger.error(String.format("Error sending result %s to %s; closing connection",
130+
result, remoteAddress), future.cause());
131+
channel.close();
132+
}
133+
});
134+
}
135+
}

common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@
1818
package org.apache.spark.network.server;
1919

2020
import io.netty.channel.ChannelHandlerContext;
21-
import io.netty.channel.ChannelInboundHandlerAdapter;
21+
import io.netty.channel.SimpleChannelInboundHandler;
2222
import io.netty.handler.timeout.IdleState;
2323
import io.netty.handler.timeout.IdleStateEvent;
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

2727
import org.apache.spark.network.client.TransportClient;
2828
import org.apache.spark.network.client.TransportResponseHandler;
29+
import org.apache.spark.network.protocol.ChunkFetchRequest;
30+
import org.apache.spark.network.protocol.Message;
2931
import org.apache.spark.network.protocol.RequestMessage;
3032
import org.apache.spark.network.protocol.ResponseMessage;
3133
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
@@ -47,7 +49,7 @@
4749
* on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not
4850
* timeout if the client is continuously sending but getting no responses, for simplicity.
4951
*/
50-
public class TransportChannelHandler extends ChannelInboundHandlerAdapter {
52+
public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
5153
private static final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class);
5254

5355
private final TransportClient client;
@@ -112,8 +114,21 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
112114
super.channelInactive(ctx);
113115
}
114116

117+
/**
118+
* Overwrite acceptInboundMessage to properly delegate ChunkFetchRequest messages
119+
* to ChunkFetchRequestHandler.
120+
*/
115121
@Override
116-
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
122+
public boolean acceptInboundMessage(Object msg) throws Exception {
123+
if (msg instanceof ChunkFetchRequest) {
124+
return false;
125+
} else {
126+
return super.acceptInboundMessage(msg);
127+
}
128+
}
129+
130+
@Override
131+
public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
117132
if (request instanceof RequestMessage) {
118133
requestHandler.handle((RequestMessage) request);
119134
} else if (request instanceof ResponseMessage) {

common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.base.Throwables;
2525
import io.netty.channel.Channel;
2626
import io.netty.channel.ChannelFuture;
27+
2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

@@ -97,9 +98,7 @@ public void channelInactive() {
9798

9899
@Override
99100
public void handle(RequestMessage request) {
100-
if (request instanceof ChunkFetchRequest) {
101-
processFetchRequest((ChunkFetchRequest) request);
102-
} else if (request instanceof RpcRequest) {
101+
if (request instanceof RpcRequest) {
103102
processRpcRequest((RpcRequest) request);
104103
} else if (request instanceof OneWayMessage) {
105104
processOneWayMessage((OneWayMessage) request);
@@ -112,36 +111,6 @@ public void handle(RequestMessage request) {
112111
}
113112
}
114113

115-
private void processFetchRequest(final ChunkFetchRequest req) {
116-
if (logger.isTraceEnabled()) {
117-
logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
118-
req.streamChunkId);
119-
}
120-
long chunksBeingTransferred = streamManager.chunksBeingTransferred();
121-
if (chunksBeingTransferred >= maxChunksBeingTransferred) {
122-
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
123-
chunksBeingTransferred, maxChunksBeingTransferred);
124-
channel.close();
125-
return;
126-
}
127-
ManagedBuffer buf;
128-
try {
129-
streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
130-
streamManager.registerChannel(channel, req.streamChunkId.streamId);
131-
buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
132-
} catch (Exception e) {
133-
logger.error(String.format("Error opening block %s for request from %s",
134-
req.streamChunkId, getRemoteAddress(channel)), e);
135-
respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
136-
return;
137-
}
138-
139-
streamManager.chunkBeingSent(req.streamChunkId.streamId);
140-
respond(new ChunkFetchSuccess(req.streamChunkId, buf)).addListener(future -> {
141-
streamManager.chunkSent(req.streamChunkId.streamId);
142-
});
143-
}
144-
145114
private void processStreamRequest(final StreamRequest req) {
146115
if (logger.isTraceEnabled()) {
147116
logger.trace("Received req from {} to fetch stream {}", getRemoteAddress(channel),

0 commit comments

Comments
 (0)