From 5064cc4eb651e15bbc9f143357079d719c6e3e95 Mon Sep 17 00:00:00 2001 From: "markrmiller@gmail.com" Date: Thu, 28 May 2020 13:55:19 -0500 Subject: [PATCH] HBASE-23113: Add additional Netty configuration for RPC. --- .../hadoop/hbase/ipc/NettyRpcClient.java | 29 +++++++++-- .../hadoop/hbase/ipc/NettyRpcConnection.java | 44 ++++++++--------- .../hadoop/hbase/ipc/NettyRpcServer.java | 48 ++++++++++++++++--- 3 files changed, 87 insertions(+), 34 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 4c85e3d51abe..9d71b502280e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -28,6 +28,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; @@ -39,24 +40,46 @@ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class NettyRpcClient extends AbstractRpcClient { + protected static final String CLIENT_TCP_REUSEADDR = "hbase.ipc.client.tcpreuseaddr"; + + protected static final String CLIENT_BUFFER_LOW_WATERMARK = "hbase.ipc.client.bufferlowwatermark"; + protected static final String CLIENT_BUFFER_HIGH_WATERMARK = + "hbase.ipc.client.bufferhighwatermark"; + + protected static final int DEFAULT_CLIENT_BUFFER_LOW_WATERMARK = 32 * 1024; + protected static final int DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK = 64 * 1024; + protected static final boolean DEFAULT_SERVER_REUSEADDR = true; + + protected final WriteBufferWaterMark writeBufferWaterMark; + final EventLoopGroup group; final Class channelClass; private final boolean shutdownGroupWhenClose; + protected final int bufferLowWatermark; + protected final int bufferHighWatermark; + protected final boolean tcpReuseAddr; + public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, MetricsConnection metrics) { super(configuration, clusterId, localAddress, metrics); + this.bufferLowWatermark = conf.getInt( + CLIENT_BUFFER_LOW_WATERMARK, DEFAULT_CLIENT_BUFFER_LOW_WATERMARK); + this.bufferHighWatermark = conf.getInt( + CLIENT_BUFFER_HIGH_WATERMARK, DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK); + this.writeBufferWaterMark = new WriteBufferWaterMark(bufferLowWatermark, bufferHighWatermark); + this.tcpReuseAddr = conf.getBoolean(CLIENT_TCP_REUSEADDR, DEFAULT_SERVER_REUSEADDR); Pair> groupAndChannelClass = - NettyRpcClientConfigHelper.getEventLoopConfig(conf); + NettyRpcClientConfigHelper.getEventLoopConfig(conf); if (groupAndChannelClass == null) { // Use our own EventLoopGroup. int threadCount = conf.getInt( - NettyRpcClientConfigHelper.HBASE_NETTY_EVENTLOOP_RPCCLIENT_THREADCOUNT_KEY, 0); + NettyRpcClientConfigHelper.HBASE_NETTY_EVENTLOOP_RPCCLIENT_THREADCOUNT_KEY, 0); this.group = new NioEventLoopGroup(threadCount, new DefaultThreadFactory("RPCClient(own)-NioEventLoopGroup", true, - Thread.NORM_PRIORITY)); + Thread.NORM_PRIORITY)); this.channelClass = NioSocketChannel.class; this.shutdownGroupWhenClose = true; } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 7d91fd9d6f04..f914b64085ac 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -21,11 +21,20 @@ import static org.apache.hadoop.hbase.ipc.CallEvent.Type.TIMEOUT; import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; - -import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler; +import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; +import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; +import org.apache.hadoop.hbase.security.SaslChallengeDecoder; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; - import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; @@ -38,27 +47,12 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; +import org.apache.hbase.thirdparty.io.netty.handler.timeout.ReadTimeoutHandler; import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future; import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener; import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise; - -import java.io.IOException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; - -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; -import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; -import org.apache.hadoop.hbase.security.SaslChallengeDecoder; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; /** * RPC connection implementation based on netty. @@ -92,7 +86,7 @@ class NettyRpcConnection extends RpcConnection { byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); this.connectionHeaderPreamble = Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble); - ConnectionHeader header = getConnectionHeader(); + RPCProtos.ConnectionHeader header = getConnectionHeader(); this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize()); this.connectionHeaderWithLength.writeInt(header.getSerializedSize()); header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength)); @@ -140,7 +134,7 @@ private void established(Channel ch) throws IOException { p.addBefore(addBeforeHandler, null, new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); p.addBefore(addBeforeHandler, null, new NettyRpcDuplexHandler(this, rpcClient.cellBlockBuilder, codec, compressor)); - p.fireUserEventTriggered(BufferCallEvent.success()); + p.fireUserEventTriggered(BufferCallBeforeInitHandler.BufferCallEvent.success()); } private boolean reloginInProgress; @@ -176,7 +170,7 @@ public void run() { private void failInit(Channel ch, IOException e) { synchronized (this) { // fail all pending calls - ch.pipeline().fireUserEventTriggered(BufferCallEvent.fail(e)); + ch.pipeline().fireUserEventTriggered(BufferCallBeforeInitHandler.BufferCallEvent.fail(e)); shutdown0(); return; } @@ -257,7 +251,9 @@ private void connect() { .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) + .option(ChannelOption.SO_REUSEADDR, rpcClient.tcpReuseAddr) .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, rpcClient.writeBufferWaterMark) .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { @Override @@ -311,7 +307,7 @@ public void run(Object parameter) { } } } - }, new CancellationCallback() { + }, new HBaseRpcController.CancellationCallback() { @Override public void run(boolean cancelled) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index eab2a0ec85c7..66958d0f9dac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -46,6 +46,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; +import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; @@ -67,20 +68,46 @@ public class NettyRpcServer extends RpcServer { * Tests may set this down from unlimited. */ public static final String HBASE_NETTY_EVENTLOOP_RPCSERVER_THREADCOUNT_KEY = - "hbase.netty.eventloop.rpcserver.thread.count"; + "hbase.netty.eventloop.rpcserver.thread.count"; private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0; + protected static final String SERVER_TCP_BACKLOG = "hbase.ipc.server.tcpbacklog"; + protected static final String SERVER_TCP_REUSEADDR = "hbase.ipc.server.tcpreuseaddr"; + + protected static final String SERVER_BUFFER_LOW_WATERMARK = "hbase.ipc.server.bufferlowwatermark"; + protected static final String SERVER_BUFFER_HIGH_WATERMARK = + "hbase.ipc.server.bufferhighwatermark"; + + protected static final boolean DEFAULT_SERVER_REUSEADDR = true; + protected static final int DEFAULT_SERVER_BUFFER_LOW_WATERMARK = 32 * 1024; + protected static final int DEFAULT_SERVER_BUFFER_HIGH_WATERMARK = 64 * 1024; + private final InetSocketAddress bindAddress; private final CountDownLatch closed = new CountDownLatch(1); private final Channel serverChannel; private final ChannelGroup allChannels = - new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); + new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); + + protected final WriteBufferWaterMark writeBufferWaterMark; + + protected final int bufferLowWatermark; + protected final int bufferHighWatermark; + + protected final boolean tcpReuseAddr; + protected final int tcpBacklog; public NettyRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); + this.bufferLowWatermark = + conf.getInt(SERVER_BUFFER_LOW_WATERMARK, DEFAULT_SERVER_BUFFER_LOW_WATERMARK); + this.bufferHighWatermark = + conf.getInt(SERVER_BUFFER_HIGH_WATERMARK, DEFAULT_SERVER_BUFFER_HIGH_WATERMARK); + this.writeBufferWaterMark = new WriteBufferWaterMark(bufferLowWatermark, bufferHighWatermark); + this.tcpBacklog = conf.getInt(SERVER_TCP_BACKLOG, -1); + this.tcpReuseAddr = conf.getBoolean(SERVER_TCP_REUSEADDR, DEFAULT_SERVER_REUSEADDR); this.bindAddress = bindAddress; EventLoopGroup eventLoopGroup; Class channelClass; @@ -89,17 +116,21 @@ public NettyRpcServer(Server server, String name, List() { @Override @@ -114,6 +145,9 @@ protected void initChannel(Channel ch) throws Exception { pipeline.addLast("encoder", new NettyRpcServerResponseEncoder(metrics)); } }); + if (tcpBacklog > -1) { + bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, tcpBacklog); + } try { serverChannel = bootstrap.bind(this.bindAddress).sync().channel(); LOG.info("Bind to {}", serverChannel.localAddress());