Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -321,7 +319,7 @@ private int nextCallId() {
* @return A pair with the Message response and the Cell data (if any).
*/
private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcController hrc,
Message param, Message returnType, final User ticket, final InetSocketAddress isa)
Message param, Message returnType, final User ticket, final Address isa)
throws ServiceException {
BlockingRpcCallback<Message> done = new BlockingRpcCallback<>();
callMethod(md, hrc, param, returnType, ticket, isa, done);
Expand Down Expand Up @@ -393,7 +391,7 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
}

Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final InetSocketAddress inetAddr,
final Message param, Message returnType, final User ticket, final Address addr,
final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
Expand All @@ -408,7 +406,6 @@ Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController
cs.setNumActionsPerServer(numActions);
}

final Address addr = Address.fromSocketAddress(inetAddr);
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
Expand Down Expand Up @@ -525,13 +522,6 @@ private static class AbstractRpcChannel {

protected final Address addr;

// We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup
// per method call on the channel. If the remote target is removed or reprovisioned and
// its identity changes a new channel with a newly resolved InetSocketAddress will be
// created as part of retry, so caching here is fine.
// Normally, caching an InetSocketAddress is an anti-pattern.
protected InetSocketAddress isa;

protected final AbstractRpcClient<?> rpcClient;

protected final User ticket;
Expand Down Expand Up @@ -582,22 +572,8 @@ protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient, Addre
@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException {
// Look up remote address upon first call
if (isa == null) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookups();
}
isa = Address.toSocketAddress(addr);
if (isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
isa = null;
throw new ServiceException(new UnknownHostException(addr + " could not be resolved"));
}
}
return rpcClient.callBlockingMethod(md, configureRpcController(controller), param, returnType,
ticket, isa);
ticket, addr);
}
}

Expand All @@ -616,24 +592,9 @@ public void callMethod(Descriptors.MethodDescriptor md, RpcController controller
Message returnType, RpcCallback<Message> done) {
HBaseRpcController configuredController = configureRpcController(
Preconditions.checkNotNull(controller, "RpcController can not be null for async rpc call"));
// Look up remote address upon first call
if (isa == null || isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookups();
}
isa = Address.toSocketAddress(addr);
if (isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
isa = null;
controller.setFailed(addr + " could not be resolved");
return;
}
}
// This method does not throw any exceptions, so the caller must provide a
// HBaseRpcController which is used to pass the exceptions.
this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done);
this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, addr, done);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayDeque;
import java.util.Locale;
Expand All @@ -52,7 +51,6 @@
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
Expand Down Expand Up @@ -265,16 +263,7 @@ protected void setupConnection() throws IOException {
if (this.rpcClient.localAddr != null) {
this.socket.bind(this.rpcClient.localAddr);
}
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookups();
}
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
if (remoteAddr.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
}
InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
NetUtils.connect(this.socket, remoteAddr, this.rpcClient.connectTO);
this.socket.setSoTimeout(this.rpcClient.readTO);
return;
Expand Down Expand Up @@ -423,15 +412,8 @@ private boolean setupSaslConnection(final InputStream in2, final OutputStream ou
if (this.metrics != null) {
this.metrics.incrNsLookups();
}
InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
if (serverAddr.isUnresolved()) {
if (this.metrics != null) {
this.metrics.incrNsLookupsFailed();
}
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
}
saslRpcClient = new HBaseSaslRpcClient(this.rpcClient.conf, provider, token,
serverAddr.getAddress(), securityInfo, this.rpcClient.fallbackAllowed,
socket.getInetAddress(), securityInfo, this.rpcClient.fallbackAllowed,
this.rpcClient.conf.get("hbase.rpc.protection",
QualityOfProtection.AUTHENTICATION.name().toLowerCase(Locale.ROOT)),
this.rpcClient.conf.getBoolean(CRYPTO_AES_ENABLED_KEY, CRYPTO_AES_ENABLED_DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
Expand Down Expand Up @@ -210,18 +209,9 @@ private void saslNegotiate(final Channel ch) {
Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
final NettyHBaseSaslRpcClientHandler saslHandler;
try {
if (this.metrics != null) {
this.metrics.incrNsLookups();
}
InetSocketAddress serverAddr = Address.toSocketAddress(remoteId.getAddress());
if (serverAddr.isUnresolved()) {
if (this.metrics != null) {
this.metrics.incrNsLookupsFailed();
}
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
}
saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, provider, token,
serverAddr.getAddress(), securityInfo, rpcClient.fallbackAllowed, this.rpcClient.conf);
((InetSocketAddress) ch.remoteAddress()).getAddress(), securityInfo,
rpcClient.fallbackAllowed, this.rpcClient.conf);
} catch (IOException e) {
failInit(ch, e);
return;
Expand Down Expand Up @@ -285,16 +275,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
private void connect() throws UnknownHostException {
assert eventLoop.inEventLoop();
LOG.trace("Connecting to {}", remoteId.getAddress());
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookups();
}
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
if (remoteAddr.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
}
InetSocketAddress remoteAddr = getRemoteInetAddress(rpcClient.metrics);
this.channel = new Bootstrap().group(eventLoop).channel(rpcClient.channelClass)
.option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.SecurityInfo;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
Expand Down Expand Up @@ -122,7 +125,7 @@ protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, Conne
this.remoteId = remoteId;
}

protected void scheduleTimeoutTask(final Call call) {
protected final void scheduleTimeoutTask(final Call call) {
if (call.timeout > 0) {
call.timeoutTask = timeoutTimer.newTimeout(new TimerTask() {

Expand All @@ -137,7 +140,7 @@ public void run(Timeout timeout) throws Exception {
}
}

protected byte[] getConnectionHeaderPreamble() {
protected final byte[] getConnectionHeaderPreamble() {
// Assemble the preamble up in a buffer first and then send it. Writing individual elements,
// they are getting sent across piecemeal according to wireshark and then server is messing
// up the reading on occasion (the passed in stream is not buffered yet).
Expand All @@ -153,7 +156,7 @@ protected byte[] getConnectionHeaderPreamble() {
return preamble;
}

protected ConnectionHeader getConnectionHeader() {
protected final ConnectionHeader getConnectionHeader() {
final ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
builder.setServiceName(remoteId.getServiceName());
final UserInformation userInfoPB = provider.getUserInfo(remoteId.ticket);
Expand All @@ -176,6 +179,21 @@ protected ConnectionHeader getConnectionHeader() {
return builder.build();
}

protected final InetSocketAddress getRemoteInetAddress(MetricsConnection metrics)
throws UnknownHostException {
if (metrics != null) {
metrics.incrNsLookups();
}
InetSocketAddress remoteAddr = Address.toSocketAddress(remoteId.getAddress());
if (remoteAddr.isUnresolved()) {
if (metrics != null) {
metrics.incrNsLookupsFailed();
}
throw new UnknownHostException(remoteId.getAddress() + " could not be resolved");
}
return remoteAddr;
}

protected abstract void callTimeout(Call call);

public ConnectionId remoteId() {
Expand Down