diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 29f51415dce7..9e988aa6fd75 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -21,9 +21,7 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.UnknownHostException; import java.util.Collection; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -321,7 +319,7 @@ private int nextCallId() { * @return A pair with the Message response and the Cell data (if any). */ private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc, - Message param, Message returnType, final User ticket, final InetSocketAddress isa) + Message param, Message returnType, final User ticket, final Address isa) throws ServiceException { BlockingRpcCallback done = new BlockingRpcCallback<>(); callMethod(md, hrc, param, returnType, ticket, isa, done); @@ -393,7 +391,7 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr, } Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, - final Message param, Message returnType, final User ticket, final InetSocketAddress inetAddr, + final Message param, Message returnType, final User ticket, final Address addr, final RpcCallback callback) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); cs.setStartTime(EnvironmentEdgeManager.currentTime()); @@ -408,7 +406,6 @@ Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController cs.setNumActionsPerServer(numActions); } - final Address addr = Address.fromSocketAddress(inetAddr); final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr); Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType, hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback() { @@ -525,13 +522,6 @@ private static class AbstractRpcChannel { protected final Address addr; - // We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup - // per method call on the channel. If the remote target is removed or reprovisioned and - // its identity changes a new channel with a newly resolved InetSocketAddress will be - // created as part of retry, so caching here is fine. - // Normally, caching an InetSocketAddress is an anti-pattern. - protected InetSocketAddress isa; - protected final AbstractRpcClient rpcClient; protected final User ticket; @@ -582,22 +572,8 @@ protected BlockingRpcChannelImplementation(AbstractRpcClient rpcClient, Addre @Override public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { - // Look up remote address upon first call - if (isa == null) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookups(); - } - isa = Address.toSocketAddress(addr); - if (isa.isUnresolved()) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookupsFailed(); - } - isa = null; - throw new ServiceException(new UnknownHostException(addr + " could not be resolved")); - } - } return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType, - ticket, isa); + ticket, addr); } } @@ -616,24 +592,9 @@ public void callMethod(Descriptors.MethodDescriptor md, RpcController controller Message returnType, RpcCallback done) { HBaseRpcController configuredController = configureRpcController( Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call")); - // Look up remote address upon first call - if (isa == null || isa.isUnresolved()) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookups(); - } - isa = Address.toSocketAddress(addr); - if (isa.isUnresolved()) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookupsFailed(); - } - isa = null; - controller.setFailed(addr + " could not be resolved"); - return; - } - } // This method does not throw any exceptions, so the caller must provide a // HBaseRpcController which is used to pass the exceptions. - this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done); + this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index 10847c0155d0..df46322afd27 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -35,7 +35,6 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; -import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; import java.util.ArrayDeque; import java.util.Locale; @@ -52,7 +51,6 @@ import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; import org.apache.hadoop.hbase.log.HBaseMarkers; -import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.HBaseSaslRpcClient; import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; @@ -265,16 +263,7 @@ protected void setupConnection() throws IOException { if (this.rpcClient.localAddr != null) { this.socket.bind(this.rpcClient.localAddr); } - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookups(); - } - InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); - if (remoteAddr.isUnresolved()) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookupsFailed(); - } - throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); - } + InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO); this.socket.setSoTimeout(this.rpcClient.readTO); return; @@ -423,15 +412,8 @@ private boolean setupSaslConnection(final InputStream in2, final OutputStream ou if (this.metrics != null) { this.metrics.incrNsLookups(); } - InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress()); - if (serverAddr.isUnresolved()) { - if (this.metrics != null) { - this.metrics.incrNsLookupsFailed(); - } - throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); - } saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token, - serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed, + socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed, this.rpcClient.conf.get("hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)), this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index a0aba5635add..fb640e94ce79 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent; import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback; -import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; import org.apache.hadoop.hbase.security.SaslChallengeDecoder; @@ -210,18 +209,9 @@ private void saslNegotiate(final Channel ch) { Promise saslPromise = ch.eventLoop().newPromise(); final NettyHBaseSaslRpcClientHandler saslHandler; try { - if (this.metrics != null) { - this.metrics.incrNsLookups(); - } - InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress()); - if (serverAddr.isUnresolved()) { - if (this.metrics != null) { - this.metrics.incrNsLookupsFailed(); - } - throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); - } saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token, - serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf); + ((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo, + rpcClient.fallbackAllowed, this.rpcClient.conf); } catch (IOException e) { failInit(ch, e); return; @@ -285,16 +275,7 @@ public void operationComplete(Future future) throws Exception { private void connect() throws UnknownHostException { assert eventLoop.inEventLoop(); LOG.trace("Connecting to {}", remoteId.getAddress()); - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookups(); - } - InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); - if (remoteAddr.isUnresolved()) { - if (this.rpcClient.metrics != null) { - this.rpcClient.metrics.incrNsLookupsFailed(); - } - throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); - } + InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics); this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass) .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index a3e2bef30c86..912fa4fb0654 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -18,11 +18,14 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.security.SecurityInfo; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider; @@ -122,7 +125,7 @@ protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, Conne this.remoteId = remoteId; } - protected void scheduleTimeoutTask(final Call call) { + protected final void scheduleTimeoutTask(final Call call) { if (call.timeout > 0) { call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() { @@ -137,7 +140,7 @@ public void run(Timeout timeout) throws Exception { } } - protected byte[] getConnectionHeaderPreamble() { + protected final byte[] getConnectionHeaderPreamble() { // Assemble the preamble up in a buffer first and then send it. Writing individual elements, // they are getting sent across piecemeal according to wireshark and then server is messing // up the reading on occasion (the passed in stream is not buffered yet). @@ -153,7 +156,7 @@ protected byte[] getConnectionHeaderPreamble() { return preamble; } - protected ConnectionHeader getConnectionHeader() { + protected final ConnectionHeader getConnectionHeader() { final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder(); builder.setServiceName(remoteId.getServiceName()); final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket); @@ -176,6 +179,21 @@ protected ConnectionHeader getConnectionHeader() { return builder.build(); } + protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics) + throws UnknownHostException { + if (metrics != null) { + metrics.incrNsLookups(); + } + InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress()); + if (remoteAddr.isUnresolved()) { + if (metrics != null) { + metrics.incrNsLookupsFailed(); + } + throw new UnknownHostException(remoteId.getAddress() + " could not be resolved"); + } + return remoteAddr; + } + protected abstract void callTimeout(Call call); public ConnectionId remoteId() {