diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java
new file mode 100644
index 000000000000..8b246e978ea0
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NettyUnsafeUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundBuffer;
+
+/**
+ * Wraps some usages of netty's unsafe API, for ease of maintainability.
+ */
+@InterfaceAudience.Private
+public final class NettyUnsafeUtils {
+
+ private NettyUnsafeUtils() {
+ }
+
+ /**
+ * Directly closes the channel, setting SO_LINGER to 0 and skipping any handlers in the pipeline.
+ * This is useful for cases where it's important to immediately close without any delay.
+ * Otherwise, pipeline handlers and even general TCP flows can cause a normal close to take
+ * upwards of a few second or more. This will likely cause the client side to see either a
+ * "Connection reset by peer" or unexpected ConnectionClosedException.
+ *
+ * It's necessary to call this from within the channel's eventLoop!
+ */
+ public static void closeImmediately(Channel channel) {
+ assert channel.eventLoop().inEventLoop();
+ channel.config().setOption(ChannelOption.SO_LINGER, 0);
+ channel.unsafe().close(channel.voidPromise());
+ }
+
+ /**
+ * Get total bytes pending write to socket
+ */
+ public static long getTotalPendingOutboundBytes(Channel channel) {
+ ChannelOutboundBuffer outboundBuffer = channel.unsafe().outboundBuffer();
+ // can be null when the channel is closing
+ if (outboundBuffer == null) {
+ return 0;
+ }
+ return outboundBuffer.totalPendingWriteBytes();
+ }
+}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 98ecf8b8d92d..df2e335a718f 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -46,6 +46,14 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String PROCESS_CALL_TIME_DESC = "Processing call time.";
String TOTAL_CALL_TIME_NAME = "totalCallTime";
String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";
+
+ String UNWRITABLE_TIME_NAME = "unwritableTime";
+ String UNWRITABLE_TIME_DESC =
+ "Time where an channel was unwritable due to having too many outbound bytes";
+ String MAX_OUTBOUND_BYTES_EXCEEDED_NAME = "maxOutboundBytesExceeded";
+ String MAX_OUTBOUND_BYTES_EXCEEDED_DESC =
+ "Number of times a connection was closed because the channel outbound "
+ + "bytes exceeded the configured max.";
String QUEUE_SIZE_NAME = "queueSize";
String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and "
+ "parsed and is waiting to run or is currently being executed.";
@@ -97,6 +105,10 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String NETTY_DM_USAGE_NAME = "nettyDirectMemoryUsage";
String NETTY_DM_USAGE_DESC = "Current Netty direct memory usage.";
+ String NETTY_TOTAL_PENDING_OUTBOUND_NAME = "nettyTotalPendingOutboundBytes";
+ String NETTY_TOTAL_PENDING_OUTBOUND_DESC = "Current total bytes pending write to all channel";
+ String NETTY_MAX_PENDING_OUTBOUND_NAME = "nettyMaxPendingOutboundBytes";
+ String NETTY_MAX_PENDING_OUTBOUND_DESC = "Current maximum bytes pending write to any channel";
void authorizationSuccess();
@@ -121,4 +133,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
void processedCall(int processingTime);
void queuedAndProcessedCall(int totalTime);
+
+ void unwritableTime(long unwritableTime);
+
+ void maxOutboundBytesExceeded();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 9c75f4e6bcba..1a6d557d8adc 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.hbase.metrics.ExceptionTrackingSourceImpl;
import org.apache.hadoop.hbase.metrics.Interns;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.metrics2.MetricHistogram;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -36,10 +37,12 @@ public class MetricsHBaseServerSourceImpl extends ExceptionTrackingSourceImpl
private final MutableFastCounter authenticationFallbacks;
private final MutableFastCounter sentBytes;
private final MutableFastCounter receivedBytes;
+ private final MutableFastCounter maxOutboundBytesExceeded;
private MetricHistogram queueCallTime;
private MetricHistogram processCallTime;
private MetricHistogram totalCallTime;
+ private MetricHistogram unwritableTime;
private MetricHistogram requestSize;
private MetricHistogram responseSize;
@@ -67,6 +70,10 @@ public MetricsHBaseServerSourceImpl(String metricsName, String metricsDescriptio
this.getMetricsRegistry().newTimeHistogram(PROCESS_CALL_TIME_NAME, PROCESS_CALL_TIME_DESC);
this.totalCallTime =
this.getMetricsRegistry().newTimeHistogram(TOTAL_CALL_TIME_NAME, TOTAL_CALL_TIME_DESC);
+ this.unwritableTime =
+ this.getMetricsRegistry().newTimeHistogram(UNWRITABLE_TIME_NAME, UNWRITABLE_TIME_DESC);
+ this.maxOutboundBytesExceeded = this.getMetricsRegistry()
+ .newCounter(MAX_OUTBOUND_BYTES_EXCEEDED_NAME, MAX_OUTBOUND_BYTES_EXCEEDED_DESC, 0);
this.requestSize =
this.getMetricsRegistry().newSizeHistogram(REQUEST_SIZE_NAME, REQUEST_SIZE_DESC);
this.responseSize =
@@ -133,6 +140,16 @@ public void queuedAndProcessedCall(int totalTime) {
totalCallTime.add(totalTime);
}
+ @Override
+ public void unwritableTime(long unwritableTime) {
+ this.unwritableTime.add(unwritableTime);
+ }
+
+ @Override
+ public void maxOutboundBytesExceeded() {
+ maxOutboundBytesExceeded.incr();
+ }
+
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
MetricsRecordBuilder mrb = metricsCollector.addRecord(metricsName);
@@ -177,6 +194,13 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
wrapper.getActiveScanRpcHandlerCount())
.addGauge(Interns.info(NETTY_DM_USAGE_NAME, NETTY_DM_USAGE_DESC),
wrapper.getNettyDmUsage());
+
+ Pair totalAndMax = wrapper.getTotalAndMaxNettyOutboundBytes();
+ mrb.addGauge(
+ Interns.info(NETTY_TOTAL_PENDING_OUTBOUND_NAME, NETTY_TOTAL_PENDING_OUTBOUND_DESC),
+ totalAndMax.getFirst());
+ mrb.addGauge(Interns.info(NETTY_MAX_PENDING_OUTBOUND_NAME, NETTY_MAX_PENDING_OUTBOUND_DESC),
+ totalAndMax.getSecond());
}
metricsRegistry.snapshot(mrb, all);
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
index 1a8980bbc7bd..bb376cba930d 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.ipc;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@@ -64,4 +65,10 @@ public interface MetricsHBaseServerWrapper {
int getActiveScanRpcHandlerCount();
long getNettyDmUsage();
+
+ /**
+ * These two metrics are calculated together, so we want to return them in one call
+ * @return pair containing total (first) and max (second) pending outbound bytes.
+ */
+ Pair getTotalAndMaxNettyOutboundBytes();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index a4c73f925d3c..b5fbb5c43d15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -97,6 +97,14 @@ void totalCall(int totalTime) {
source.queuedAndProcessedCall(totalTime);
}
+ void unwritableTime(long unwritableTime) {
+ source.unwritableTime(unwritableTime);
+ }
+
+ void maxOutboundBytesExceeded() {
+ source.maxOutboundBytesExceeded();
+ }
+
public void exception(Throwable throwable) {
source.exception();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 857315568c5e..1fc1806265d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@@ -209,4 +210,16 @@ public long getNettyDmUsage() {
return DirectMemoryUtils.getNettyDirectMemoryUsage();
}
+
+ @Override
+ public Pair getTotalAndMaxNettyOutboundBytes() {
+ if (
+ !isServerStarted() || this.server.getScheduler() == null
+ || !(this.server instanceof NettyRpcServer)
+ ) {
+ return Pair.newPair(0L, 0L);
+ }
+
+ return ((NettyRpcServer) server).getTotalAndMaxNettyOutboundBytes();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index 0b7badf7d815..722ee1d28c91 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -37,6 +37,8 @@
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
+import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.yetus.audience.InterfaceAudience;
@@ -53,6 +55,7 @@
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel;
+import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark;
import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup;
import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup;
import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder;
@@ -84,6 +87,38 @@ public class NettyRpcServer extends RpcServer {
static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
static final String HEAP_ALLOCATOR_TYPE = "heap";
+ /**
+ * Low watermark for pending outbound bytes of a single netty channel. If the high watermark was
+ * exceeded, channel will have setAutoRead to true again. The server will start reading incoming
+ * bytes (requests) from the client channel.
+ */
+ public static final String CHANNEL_WRITABLE_LOW_WATERMARK_KEY =
+ "hbase.server.netty.writable.watermark.low";
+ private static final int CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT = 0;
+
+ /**
+ * High watermark for pending outbound bytes of a single netty channel. If the number of pending
+ * outbound bytes exceeds this threshold, setAutoRead will be false for the channel. The server
+ * will stop reading incoming requests from the client channel.
+ *
+ * Note: any requests already in the call queue will still be processed.
+ */
+ public static final String CHANNEL_WRITABLE_HIGH_WATERMARK_KEY =
+ "hbase.server.netty.writable.watermark.high";
+ private static final int CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT = 0;
+
+ /**
+ * Fatal watermark for pending outbound bytes of a single netty channel. If the number of pending
+ * outbound bytes exceeds this threshold, the connection will be forcibly closed so that memory
+ * can be reclaimed. The client will have to re-establish a new connection and retry any in-flight
+ * requests.
+ *
+ * Note: must be higher than the high watermark, otherwise it's ignored.
+ */
+ public static final String CHANNEL_WRITABLE_FATAL_WATERMARK_KEY =
+ "hbase.server.netty.writable.watermark.fatal";
+ private static final int CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT = 0;
+
private final InetSocketAddress bindAddress;
private final CountDownLatch closed = new CountDownLatch(1);
@@ -94,6 +129,9 @@ public class NettyRpcServer extends RpcServer {
private final AtomicReference keyStoreWatcher = new AtomicReference<>();
private final AtomicReference trustStoreWatcher = new AtomicReference<>();
+ private volatile int writeBufferFatalThreshold;
+ private volatile WriteBufferWaterMark writeBufferWaterMark;
+
public NettyRpcServer(Server server, String name, List services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
boolean reservoirEnabled) throws IOException {
@@ -108,6 +146,10 @@ public NettyRpcServer(Server server, String name, List channelClass = config.serverChannelClass();
ServerBootstrap bootstrap = new ServerBootstrap().group(eventLoopGroup).channel(channelClass)
@@ -117,6 +159,7 @@ public NettyRpcServer(Server server, String name, List() {
@Override
protected void initChannel(Channel ch) throws Exception {
+ ch.config().setWriteBufferWaterMark(writeBufferWaterMark);
ch.config().setAllocator(channelAllocator);
ChannelPipeline pipeline = ch.pipeline();
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
@@ -124,12 +167,18 @@ protected void initChannel(Channel ch) throws Exception {
if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) {
initSSL(pipeline, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true));
}
+ NettyServerRpcConnection conn = createNettyServerRpcConnection(ch);
pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, preambleDecoder)
- .addLast(createNettyRpcServerPreambleHandler())
+ .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this, conn))
// We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may
// send RpcResponse to client.
- .addLast(NettyRpcServerResponseEncoder.NAME,
- new NettyRpcServerResponseEncoder(metrics));
+ .addLast(NettyRpcServerResponseEncoder.NAME, new NettyRpcServerResponseEncoder(metrics))
+ // Add writability handler after the response encoder, so we can abort writes before
+ // they get encoded, if the fatal threshold is exceeded. We pass in suppliers here so
+ // that the handler configs can be live updated via update_config.
+ .addLast(NettyRpcServerChannelWritabilityHandler.NAME,
+ new NettyRpcServerChannelWritabilityHandler(metrics, () -> writeBufferFatalThreshold,
+ () -> isWritabilityBackpressureEnabled()));
}
});
try {
@@ -142,6 +191,91 @@ protected void initChannel(Channel ch) throws Exception {
this.scheduler.init(new RpcSchedulerContext(this));
}
+ @Override
+ public void onConfigurationChange(Configuration newConf) {
+ super.onConfigurationChange(newConf);
+ configureNettyWatermarks(newConf);
+ }
+
+ private void configureNettyWatermarks(Configuration conf) {
+ int watermarkLow =
+ conf.getInt(CHANNEL_WRITABLE_LOW_WATERMARK_KEY, CHANNEL_WRITABLE_LOW_WATERMARK_DEFAULT);
+ int watermarkHigh =
+ conf.getInt(CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, CHANNEL_WRITABLE_HIGH_WATERMARK_DEFAULT);
+ int fatalThreshold =
+ conf.getInt(CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, CHANNEL_WRITABLE_FATAL_WATERMARK_DEFAULT);
+
+ WriteBufferWaterMark oldWaterMark = writeBufferWaterMark;
+ int oldFatalThreshold = writeBufferFatalThreshold;
+
+ boolean disabled = false;
+ if (watermarkHigh == 0 && watermarkLow == 0) {
+ // if both are 0, use the netty default, which we will treat as "disabled".
+ // when disabled, we won't manage autoRead in response to writability changes.
+ writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
+ disabled = true;
+ } else {
+ // netty checks pendingOutboundBytes < watermarkLow. It can never be less than 0, so set to
+ // 1 to avoid confusing behavior.
+ if (watermarkLow == 0) {
+ LOG.warn(
+ "Detected a {} value of 0, which is impossible to achieve "
+ + "due to how netty evaluates these thresholds, setting to 1",
+ CHANNEL_WRITABLE_LOW_WATERMARK_KEY);
+ watermarkLow = 1;
+ }
+
+ // netty validates the watermarks and throws an exception if high < low, fail more gracefully
+ // by disabling the watermarks and warning.
+ if (watermarkHigh <= watermarkLow) {
+ LOG.warn(
+ "Detected {} value {}, lower than {} value {}. This will fail netty validation, "
+ + "so disabling",
+ CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, watermarkHigh, CHANNEL_WRITABLE_LOW_WATERMARK_KEY,
+ watermarkLow);
+ writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
+ } else {
+ writeBufferWaterMark = new WriteBufferWaterMark(watermarkLow, watermarkHigh);
+ }
+
+ // only apply this check when watermark is enabled. this way we give the operator some
+ // flexibility if they want to try enabling fatal threshold without backpressure.
+ if (fatalThreshold > 0 && fatalThreshold <= watermarkHigh) {
+ LOG.warn("Detected a {} value of {}, which is lower than the {} value of {}, ignoring.",
+ CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, fatalThreshold, CHANNEL_WRITABLE_HIGH_WATERMARK_KEY,
+ watermarkHigh);
+ fatalThreshold = 0;
+ }
+ }
+
+ writeBufferFatalThreshold = fatalThreshold;
+
+ if (
+ oldWaterMark != null && (oldWaterMark.low() != writeBufferWaterMark.low()
+ || oldWaterMark.high() != writeBufferWaterMark.high()
+ || oldFatalThreshold != writeBufferFatalThreshold)
+ ) {
+ LOG.info("Updated netty outbound write buffer watermarks: low={}, high={}, fatal={}",
+ disabled ? "disabled" : writeBufferWaterMark.low(),
+ disabled ? "disabled" : writeBufferWaterMark.high(),
+ writeBufferFatalThreshold <= 0 ? "disabled" : writeBufferFatalThreshold);
+ }
+
+ // update any existing channels
+ for (Channel channel : allChannels) {
+ channel.config().setWriteBufferWaterMark(writeBufferWaterMark);
+ // if disabling watermark, set auto read to true in case channel had been exceeding
+ // previous watermark
+ if (disabled) {
+ channel.config().setAutoRead(true);
+ }
+ }
+ }
+
+ public boolean isWritabilityBackpressureEnabled() {
+ return writeBufferWaterMark != WriteBufferWaterMark.DEFAULT;
+ }
+
private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
if (value != null) {
@@ -172,10 +306,10 @@ private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOExcept
}
}
- // will be overriden in tests
+ // will be overridden in tests
@InterfaceAudience.Private
- protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
- return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
+ protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
+ return new NettyServerRpcConnection(NettyRpcServer.this, channel);
}
@Override
@@ -296,4 +430,19 @@ SslContext getSslContext() throws X509Exception, IOException {
}
return result;
}
+
+ public int getWriteBufferFatalThreshold() {
+ return writeBufferFatalThreshold;
+ }
+
+ public Pair getTotalAndMaxNettyOutboundBytes() {
+ long total = 0;
+ long max = 0;
+ for (Channel channel : allChannels) {
+ long outbound = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
+ total += outbound;
+ max = Math.max(max, outbound);
+ }
+ return Pair.newPair(total, max);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java
new file mode 100644
index 000000000000..4b0b3878da81
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerChannelWritabilityHandler.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.function.BooleanSupplier;
+import java.util.function.IntSupplier;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosedException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.NettyUnsafeUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
+import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil;
+
+/**
+ * Handler to enforce writability protections on our server channels:
+ * - Responds to channel writability events, which are triggered when the total pending bytes for a
+ * channel passes configured high and low watermarks. When high watermark is exceeded, the channel
+ * is setAutoRead(false). This way, we won't accept new requests from the client until some pending
+ * outbound bytes are successfully received by the client.
+ * - Pre-processes any channel write requests. If the total pending outbound bytes exceeds a fatal
+ * threshold, the channel is forcefully closed and the write is set to failed. This handler should
+ * be the last handler in the pipeline so that it's the first handler to receive any messages sent
+ * to channel.write() or channel.writeAndFlush().
+ */
+@InterfaceAudience.Private
+public class NettyRpcServerChannelWritabilityHandler extends ChannelDuplexHandler {
+
+ static final String NAME = "NettyRpcServerChannelWritabilityHandler";
+
+ private final MetricsHBaseServer metrics;
+ private final IntSupplier pendingBytesFatalThreshold;
+ private final BooleanSupplier isWritabilityBackpressureEnabled;
+
+ private boolean writable = true;
+ private long unwritableStartTime;
+
+ NettyRpcServerChannelWritabilityHandler(MetricsHBaseServer metrics,
+ IntSupplier pendingBytesFatalThreshold, BooleanSupplier isWritabilityBackpressureEnabled) {
+ this.metrics = metrics;
+ this.pendingBytesFatalThreshold = pendingBytesFatalThreshold;
+ this.isWritabilityBackpressureEnabled = isWritabilityBackpressureEnabled;
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+ throws Exception {
+ if (handleFatalThreshold(ctx)) {
+ promise.setFailure(
+ new ConnectionClosedException("Channel outbound bytes exceeded fatal threshold"));
+ if (msg instanceof RpcResponse) {
+ ((RpcResponse) msg).done();
+ } else {
+ ReferenceCountUtil.release(msg);
+ }
+ return;
+ }
+ ctx.write(msg, promise);
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ if (isWritabilityBackpressureEnabled.getAsBoolean()) {
+ handleWritabilityChanged(ctx);
+ }
+ ctx.fireChannelWritabilityChanged();
+ }
+
+ private boolean handleFatalThreshold(ChannelHandlerContext ctx) {
+ int fatalThreshold = pendingBytesFatalThreshold.getAsInt();
+ if (fatalThreshold <= 0) {
+ return false;
+ }
+
+ Channel channel = ctx.channel();
+ long outboundBytes = NettyUnsafeUtils.getTotalPendingOutboundBytes(channel);
+ if (outboundBytes < fatalThreshold) {
+ return false;
+ }
+
+ if (channel.isOpen()) {
+ metrics.maxOutboundBytesExceeded();
+ RpcServer.LOG.warn(
+ "{}: Closing connection because outbound buffer size of {} exceeds fatal threshold of {}",
+ channel.remoteAddress(), outboundBytes, fatalThreshold);
+ NettyUnsafeUtils.closeImmediately(channel);
+ }
+
+ return true;
+ }
+
+ private void handleWritabilityChanged(ChannelHandlerContext ctx) {
+ boolean oldWritableValue = this.writable;
+
+ this.writable = ctx.channel().isWritable();
+ ctx.channel().config().setAutoRead(this.writable);
+
+ if (!oldWritableValue && this.writable) {
+ // changing from not writable to writable, update metrics
+ metrics.unwritableTime(EnvironmentEdgeManager.currentTime() - unwritableStartTime);
+ unwritableStartTime = 0;
+ } else if (oldWritableValue && !this.writable) {
+ // changing from writable to non-writable, set start time
+ unwritableStartTime = EnvironmentEdgeManager.currentTime();
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
index 8269bbc60d88..b79a67f986e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java
@@ -22,7 +22,6 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
@@ -38,14 +37,15 @@ class NettyRpcServerPreambleHandler extends SimpleChannelInboundHandler
static final String DECODER_NAME = "preambleDecoder";
private final NettyRpcServer rpcServer;
+ private final NettyServerRpcConnection conn;
- public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer) {
+ public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer, NettyServerRpcConnection conn) {
this.rpcServer = rpcServer;
+ this.conn = conn;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- NettyServerRpcConnection conn = createNettyServerRpcConnection(ctx.channel());
ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes());
msg.readBytes(buf);
buf.flip();
@@ -76,9 +76,4 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.channel().remoteAddress(), cause);
NettyFutureUtils.safeClose(ctx);
}
-
- // will be overridden in tests
- protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
- return new NettyServerRpcConnection(rpcServer, channel);
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
index fd0c6d75d888..4f0540da80a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -54,6 +54,6 @@ class NettyServerCall extends ServerCall {
public synchronized void sendResponseIfReady() throws IOException {
// set param null to reduce memory pressure
this.param = null;
- connection.channel.writeAndFlush(this);
+ connection.doRespond(this);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
index d5c408c23874..da4f70e3a247 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/FailingNettyRpcServer.java
@@ -49,12 +49,7 @@ public void processRequest(ByteBuff buf) throws IOException, InterruptedExceptio
}
@Override
- protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
- return new NettyRpcServerPreambleHandler(FailingNettyRpcServer.this) {
- @Override
- protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
- return new FailingConnection(FailingNettyRpcServer.this, channel);
- }
- };
+ protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
+ return new FailingConnection(FailingNettyRpcServer.this, channel);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
index 6e5dfe87fc7b..7170413bee90 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;
+import org.apache.hadoop.hbase.util.Pair;
+
public class MetricsHBaseServerWrapperStub implements MetricsHBaseServerWrapper {
@Override
public long getTotalQueueSize() {
@@ -127,4 +129,9 @@ public int getMetaPriorityQueueLength() {
public int getActiveMetaPriorityRpcHandlerCount() {
return 1;
}
+
+ @Override
+ public Pair getTotalAndMaxNettyOutboundBytes() {
+ return Pair.newPair(100L, 5L);
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java
new file mode 100644
index 000000000000..001f6dbd22c7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyChannelWritability.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
+import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos;
+
+@Category({ RPCTests.class, MediumTests.class })
+public class TestNettyChannelWritability {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestNettyChannelWritability.class);
+
+ private static final MetricsAssertHelper METRICS_ASSERT =
+ CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+
+ private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
+ private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
+
+ /**
+ * Test that we properly send configured watermarks to netty, and trigger setWritable when
+ * necessary.
+ */
+ @Test
+ public void testNettyWritableWatermarks() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_LOW_WATERMARK_KEY, 1);
+ conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_HIGH_WATERMARK_KEY, 2);
+
+ NettyRpcServer rpcServer = createRpcServer(conf, 0);
+ try {
+ sendAndReceive(conf, rpcServer, 5);
+ METRICS_ASSERT.assertCounterGt("unwritableTime_numOps", 0,
+ rpcServer.metrics.getMetricsSource());
+ } finally {
+ rpcServer.stop();
+ }
+ }
+
+ /**
+ * Test that our fatal watermark is honored, which requires artificially causing some queueing so
+ * that pendingOutboundBytes increases.
+ */
+ @Test
+ public void testNettyWritableFatalThreshold() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setInt(NettyRpcServer.CHANNEL_WRITABLE_FATAL_WATERMARK_KEY, 1);
+
+ // flushAfter is 3 here, with requestCount 5 below. If we never flush, the WriteTasks will sit
+ // in the eventloop. So we flush a few at once, which will ensure that we hit fatal threshold
+ NettyRpcServer rpcServer = createRpcServer(conf, 3);
+ try {
+ CompletionException exception =
+ assertThrows(CompletionException.class, () -> sendAndReceive(conf, rpcServer, 5));
+ assertTrue(exception.getCause().getCause() instanceof ServiceException);
+ METRICS_ASSERT.assertCounterGt("maxOutboundBytesExceeded", 0,
+ rpcServer.metrics.getMetricsSource());
+ } finally {
+ rpcServer.stop();
+ }
+ }
+
+ private void sendAndReceive(Configuration conf, NettyRpcServer rpcServer, int requestCount)
+ throws Exception {
+ List cells = new ArrayList<>();
+ int count = 3;
+ for (int i = 0; i < count; i++) {
+ cells.add(CELL);
+ }
+
+ try (NettyRpcClient client = new NettyRpcClient(conf)) {
+ rpcServer.start();
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub =
+ newBlockingStub(client, rpcServer.getListenerAddress());
+ CompletableFuture[] futures = new CompletableFuture[requestCount];
+ for (int i = 0; i < requestCount; i++) {
+ futures[i] = CompletableFuture.runAsync(() -> {
+ try {
+ sendMessage(cells, stub);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ CompletableFuture.allOf(futures).join();
+ }
+ }
+
+ private void sendMessage(List| cells,
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub) throws Exception {
+ HBaseRpcController pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
+ String message = "hello";
+ assertEquals(message,
+ stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build())
+ .getMessage());
+ int index = 0;
+ CellScanner cellScanner = pcrc.cellScanner();
+ assertNotNull(cellScanner);
+ while (cellScanner.advance()) {
+ assertEquals(CELL, cellScanner.current());
+ index++;
+ }
+ assertEquals(cells.size(), index);
+ }
+
+ private NettyRpcServer createRpcServer(Configuration conf, int flushAfter) throws IOException {
+ String name = "testRpcServer";
+ ArrayList services =
+ Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null));
+
+ InetSocketAddress bindAddress = new InetSocketAddress("localhost", 0);
+ FifoRpcScheduler scheduler = new FifoRpcScheduler(conf, 1);
+
+ AtomicInteger writeCount = new AtomicInteger(0);
+
+ return new NettyRpcServer(null, name, services, bindAddress, conf, scheduler, true) {
+ @Override
+ protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
+ return new NettyServerRpcConnection(this, channel) {
+ @Override
+ protected void doRespond(RpcResponse resp) {
+ if (writeCount.incrementAndGet() >= flushAfter) {
+ super.doRespond(resp);
+ } else {
+ channel.write(resp);
+ }
+ }
+ };
+ }
+ };
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
index 288bb3fe2624..c55568d392ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcMetrics.java
@@ -89,6 +89,9 @@ public void testWrapperSource() {
HELPER.assertGauge("numCallsInWriteQueue", 50, serverSource);
HELPER.assertGauge("numCallsInReadQueue", 50, serverSource);
HELPER.assertGauge("numCallsInScanQueue", 2, serverSource);
+ HELPER.assertGauge("nettyDirectMemoryUsage", 100, serverSource);
+ HELPER.assertGauge("nettyTotalPendingOutboundBytes", 100, serverSource);
+ HELPER.assertGauge("nettyMaxPendingOutboundBytes", 5, serverSource);
}
/**
@@ -100,6 +103,12 @@ public void testSourceMethods() {
new MetricsHBaseServer("HMaster", new MetricsHBaseServerWrapperStub());
MetricsHBaseServerSource serverSource = mrpc.getMetricsSource();
+ mrpc.unwritableTime(100);
+ mrpc.maxOutboundBytesExceeded();
+ mrpc.maxOutboundBytesExceeded();
+ HELPER.assertCounter("maxOutboundBytesExceeded", 2, serverSource);
+ HELPER.assertCounter("unwritableTime_NumOps", 1, serverSource);
+
for (int i = 0; i < 12; i++) {
mrpc.authenticationFailure();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
index 9f6b7d54430b..bc791754a12e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcSkipInitialSaslHandshake.java
@@ -28,7 +28,7 @@
import java.io.File;
import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
@@ -49,9 +49,7 @@
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
-import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
@@ -131,29 +129,15 @@ public void test() throws Exception {
.thenReturn(HBaseKerberosUtils.KRB_PRINCIPAL);
SecurityInfo.addInfo("TestProtobufRpcProto", securityInfoMock);
- final AtomicBoolean useSaslRef = new AtomicBoolean(false);
+ final AtomicReference conn = new AtomicReference<>(null);
NettyRpcServer rpcServer = new NettyRpcServer(null, getClass().getSimpleName(),
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
new InetSocketAddress(HOST, 0), serverConf, new FifoRpcScheduler(serverConf, 1), true) {
@Override
- protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
- return new NettyRpcServerPreambleHandler(this) {
- private NettyServerRpcConnection conn;
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- super.channelRead0(ctx, msg);
- useSaslRef.set(conn.useSasl);
-
- }
-
- @Override
- protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
- conn = super.createNettyServerRpcConnection(channel);
- return conn;
- }
- };
+ protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channel) {
+ conn.set(super.createNettyServerRpcConnection(channel));
+ return conn.get();
}
};
@@ -167,7 +151,7 @@ protected NettyServerRpcConnection createNettyServerRpcConnection(Channel channe
stub.echo(null, TestProtos.EchoRequestProto.newBuilder().setMessage("test").build())
.getMessage();
assertTrue("test".equals(response));
- assertFalse(useSaslRef.get());
+ assertFalse(conn.get().useSasl);
} finally {
rpcServer.stop();
| |