Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory;
Expand All @@ -39,24 +40,46 @@
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {

protected static final String CLIENT_TCP_REUSEADDR = "hbase.ipc.client.tcpreuseaddr";

protected static final String CLIENT_BUFFER_LOW_WATERMARK = "hbase.ipc.client.bufferlowwatermark";
protected static final String CLIENT_BUFFER_HIGH_WATERMARK =
"hbase.ipc.client.bufferhighwatermark";

protected static final int DEFAULT_CLIENT_BUFFER_LOW_WATERMARK = 32 * 1024;
protected static final int DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK = 64 * 1024;
protected static final boolean DEFAULT_SERVER_REUSEADDR = true;

protected final WriteBufferWaterMark writeBufferWaterMark;

final EventLoopGroup group;

final Class<? extends Channel> channelClass;

private final boolean shutdownGroupWhenClose;

protected final int bufferLowWatermark;
protected final int bufferHighWatermark;
protected final boolean tcpReuseAddr;

public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
MetricsConnection metrics) {
super(configuration, clusterId, localAddress, metrics);
this.bufferLowWatermark = conf.getInt(
CLIENT_BUFFER_LOW_WATERMARK, DEFAULT_CLIENT_BUFFER_LOW_WATERMARK);
this.bufferHighWatermark = conf.getInt(
CLIENT_BUFFER_HIGH_WATERMARK, DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK);
this.writeBufferWaterMark = new WriteBufferWaterMark(bufferLowWatermark, bufferHighWatermark);
this.tcpReuseAddr = conf.getBoolean(CLIENT_TCP_REUSEADDR, DEFAULT_SERVER_REUSEADDR);
Pair<EventLoopGroup, Class<? extends Channel>> groupAndChannelClass =
NettyRpcClientConfigHelper.getEventLoopConfig(conf);
NettyRpcClientConfigHelper.getEventLoopConfig(conf);
if (groupAndChannelClass == null) {
// Use our own EventLoopGroup.
int threadCount = conf.getInt(
NettyRpcClientConfigHelper.HBASE_NETTY_EVENTLOOP_RPCCLIENT_THREADCOUNT_KEY, 0);
NettyRpcClientConfigHelper.HBASE_NETTY_EVENTLOOP_RPCCLIENT_THREADCOUNT_KEY, 0);
this.group = new NioEventLoopGroup(threadCount,
new DefaultThreadFactory("RPCClient(own)-NioEventLoopGroup", true,
Thread.NORM_PRIORITY));
Thread.NORM_PRIORITY));
this.channelClass = NioSocketChannel.class;
this.shutdownGroupWhenClose = true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@
import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT;
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;

import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;

import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
Expand All @@ -38,27 +47,12 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;

/**
* RPC connection implementation based on netty.
Expand Down Expand Up @@ -92,7 +86,7 @@ class NettyRpcConnection extends RpcConnection {
byte[] connectionHeaderPreamble = getConnectionHeaderPreamble();
this.connectionHeaderPreamble =
Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);
ConnectionHeader header = getConnectionHeader();
RPCProtos.ConnectionHeader header = getConnectionHeader();
this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());
this.connectionHeaderWithLength.writeInt(header.getSerializedSize());
header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
Expand Down Expand Up @@ -140,7 +134,7 @@ private void established(Channel ch) throws IOException {
p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
p.addBefore(addBeforeHandler, null,
new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor));
p.fireUserEventTriggered(BufferCallEvent.success());
p.fireUserEventTriggered(BufferCallBeforeInitHandler.BufferCallEvent.success());
}

private boolean reloginInProgress;
Expand Down Expand Up @@ -176,7 +170,7 @@ public void run() {
private void failInit(Channel ch, IOException e) {
synchronized (this) {
// fail all pending calls
ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e));
ch.pipeline().fireUserEventTriggered(BufferCallBeforeInitHandler.BufferCallEvent.fail(e));
shutdown0();
return;
}
Expand Down Expand Up @@ -257,7 +251,9 @@ private void connect() {
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO)
.option(ChannelOption.SO_REUSEADDR, rpcClient.tcpReuseAddr)
.handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr)
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, rpcClient.writeBufferWaterMark)
.remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() {

@Override
Expand Down Expand Up @@ -311,7 +307,7 @@ public void run(Object parameter) {
}
}
}
}, new CancellationCallback() {
}, new HBaseRpcController.CancellationCallback() {

@Override
public void run(boolean cancelled) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
Expand All @@ -67,20 +68,46 @@ public class NettyRpcServer extends RpcServer {
* Tests may set this down from unlimited.
*/
public static final String HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY =
"hbase.netty.eventloop.rpcserver.thread.count";
"hbase.netty.eventloop.rpcserver.thread.count";
private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0;

protected static final String SERVER_TCP_BACKLOG = "hbase.ipc.server.tcpbacklog";
protected static final String SERVER_TCP_REUSEADDR = "hbase.ipc.server.tcpreuseaddr";

protected static final String SERVER_BUFFER_LOW_WATERMARK = "hbase.ipc.server.bufferlowwatermark";
protected static final String SERVER_BUFFER_HIGH_WATERMARK =
"hbase.ipc.server.bufferhighwatermark";

protected static final boolean DEFAULT_SERVER_REUSEADDR = true;
protected static final int DEFAULT_SERVER_BUFFER_LOW_WATERMARK = 32 * 1024;
protected static final int DEFAULT_SERVER_BUFFER_HIGH_WATERMARK = 64 * 1024;

private final InetSocketAddress bindAddress;

private final CountDownLatch closed = new CountDownLatch(1);
private final Channel serverChannel;
private final ChannelGroup allChannels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);

protected final WriteBufferWaterMark writeBufferWaterMark;

protected final int bufferLowWatermark;
protected final int bufferHighWatermark;

protected final boolean tcpReuseAddr;
protected final int tcpBacklog;

public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
boolean reservoirEnabled) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
this.bufferLowWatermark =
conf.getInt(SERVER_BUFFER_LOW_WATERMARK, DEFAULT_SERVER_BUFFER_LOW_WATERMARK);
this.bufferHighWatermark =
conf.getInt(SERVER_BUFFER_HIGH_WATERMARK, DEFAULT_SERVER_BUFFER_HIGH_WATERMARK);
this.writeBufferWaterMark = new WriteBufferWaterMark(bufferLowWatermark, bufferHighWatermark);
this.tcpBacklog = conf.getInt(SERVER_TCP_BACKLOG, -1);
this.tcpReuseAddr = conf.getBoolean(SERVER_TCP_REUSEADDR, DEFAULT_SERVER_REUSEADDR);
this.bindAddress = bindAddress;
EventLoopGroup eventLoopGroup;
Class<? extends ServerChannel> channelClass;
Expand All @@ -89,17 +116,21 @@ public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterfa
eventLoopGroup = config.group();
channelClass = config.serverChannelClass();
} else {
int threadCount = server == null? EVENTLOOP_THREADCOUNT_DEFAULT:
server.getConfiguration().getInt(HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY,
EVENTLOOP_THREADCOUNT_DEFAULT);
int threadCount = server == null ?
EVENTLOOP_THREADCOUNT_DEFAULT :
server.getConfiguration().getInt(HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY,
EVENTLOOP_THREADCOUNT_DEFAULT);
eventLoopGroup = new NioEventLoopGroup(threadCount,
new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY));
new DefaultThreadFactory("NettyRpcServer", true, Thread.MAX_PRIORITY));
channelClass = NioServerSocketChannel.class;
}
ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
.option(ChannelOption.SO_REUSEADDR, tcpReuseAddr)
.childOption(ChannelOption.TCP_NODELAY, tcpNoDelay)
.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_REUSEADDR, tcpReuseAddr)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(bufferLowWatermark, bufferHighWatermark))
.childHandler(new ChannelInitializer<Channel>() {

@Override
Expand All @@ -114,6 +145,9 @@ protected void initChannel(Channel ch) throws Exception {
pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics));
}
});
if (tcpBacklog > -1) {
bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, tcpBacklog);
}
try {
serverChannel = bootstrap.bind(this.bindAddress).sync().channel();
LOG.info("Bind to {}", serverChannel.localAddress());
Expand Down