Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Comment on lines +31 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here, no longer in use.

import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -79,8 +81,6 @@ class AsyncConnectionImpl implements AsyncConnection {
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
10, TimeUnit.MILLISECONDS);

private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";

private final Configuration conf;

final AsyncConnectionConfiguration connConf;
Expand All @@ -95,8 +95,6 @@ class AsyncConnectionImpl implements AsyncConnection {

final RpcControllerFactory rpcControllerFactory;

private final boolean hostnameCanChange;

private final AsyncRegionLocator locator;

final AsyncRpcRetryingCallerFactory callerFactory;
Expand Down Expand Up @@ -142,7 +140,6 @@ public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, Stri
this.rpcClient = RpcClientFactory.createClient(
conf, clusterId, localAddress, metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout =
(int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
Expand Down Expand Up @@ -264,7 +261,7 @@ private ClientService.Interface createRegionServerStub(ServerName serverName) th

ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
getStubKey(ClientService.getDescriptor().getName(), serverName, hostnameCanChange),
getStubKey(ClientService.getDescriptor().getName(), serverName),
() -> createRegionServerStub(serverName));
}

Expand All @@ -278,7 +275,7 @@ private AdminService.Interface createAdminServerStub(ServerName serverName) thro

AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
getStubKey(AdminService.getDescriptor().getName(), serverName, hostnameCanChange),
getStubKey(AdminService.getDescriptor().getName(), serverName),
() -> createAdminServerStub(serverName));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Comment on lines 27 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: both InetAddress and InetSocketAddress are no longer in use, imports can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. There are going to be other checkstyle nits too, will look at the report and fix them all.

import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -128,32 +129,17 @@ public static void setServerSideHConnectionRetriesConfig(final Configuration c,
}

/**
* Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE].
* Get a unique key for the rpc stub to the given server.
*/
static int retries2Attempts(int retries) {
return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1);
static String getStubKey(String serviceName, ServerName serverName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As said before, I wonder what is the problem if we just use host:port directly here? In the past I think the problem is that we will not resolve again when connecting, for now, I think the problem has been solved?

return String.format("%s@%s", serviceName, serverName);
}

/**
* Get a unique key for the rpc stub to the given server.
* Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE].
*/
static String getStubKey(String serviceName, ServerName serverName, boolean hostnameCanChange) {
// Sometimes, servers go down and they come back up with the same hostname but a different
// IP address. Force a resolution of the rsHostname by trying to instantiate an
// InetSocketAddress, and this way we will rightfully get a new stubKey.
// Also, include the hostname in the key so as to take care of those cases where the
// DNS name is different but IP address remains the same.
String hostname = serverName.getHostname();
int port = serverName.getPort();
if (hostnameCanChange) {
try {
InetAddress ip = InetAddress.getByName(hostname);
return serviceName + "@" + hostname + "-" + ip.getHostAddress() + ":" + port;
} catch (UnknownHostException e) {
LOG.warn("Can not resolve " + hostname + ", please check your network", e);
}
}
return serviceName + "@" + hostname + ":" + port;
static int retries2Attempts(int retries) {
return Math.max(1, retries == Integer.MAX_VALUE ? Integer.MAX_VALUE : retries + 1);
}

static void checkHasFamilies(Mutation mutation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class MetricsConnection implements StatisticTrackable {
private static final String HEAP_BASE = "heapOccupancy_";
private static final String CACHE_BASE = "cacheDroppingExceptions_";
private static final String UNKNOWN_EXCEPTION = "UnknownException";
private static final String NS_LOOKUPS = "nsLookups";
private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed";
private static final String CLIENT_SVC = ClientService.getDescriptor().getName();

/** A container class for collecting details about the RPC call as it percolates. */
Expand Down Expand Up @@ -287,6 +289,8 @@ private static interface NewMetric<T> {
protected final Counter hedgedReadWin;
protected final Histogram concurrentCallsPerServerHist;
protected final Histogram numActionsPerServerHist;
protected final Counter nsLookups;
protected final Counter nsLookupsFailed;

// dynamic metrics

Expand Down Expand Up @@ -349,6 +353,8 @@ protected Ratio getRatio() {
"concurrentCallsPerServer", scope));
this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class,
"numActionsPerServer", scope));
this.nsLookups = registry.counter(name(this.getClass(), NS_LOOKUPS, scope));
this.nsLookupsFailed = registry.counter(name(this.getClass(), NS_LOOKUPS_FAILED, scope));

this.reporter = JmxReporter.forRegistry(this.registry).build();
this.reporter.start();
Expand Down Expand Up @@ -517,4 +523,12 @@ public void incrCacheDroppingExceptions(Object exception) {
(exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
cacheDroppingExceptions, counterFactory).inc();
}

public void incrNsLookups() {
this.nsLookups.inc();
}

public void incrNsLookupsFailed() {
this.nsLookupsFailed.inc();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -134,10 +135,10 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC

private int maxConcurrentCallsPerServer;

private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
private static final LoadingCache<Address, AtomicInteger> concurrentCounterCache =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good. When implementing an in-house rpc framework in the past, I used to use InetSocketAddress.createUnresolved. But it has a problem that usually a network framework will not accept a unresolved InetSocketAddress so if you forget to recreate a resolved one you will get exception. Since here we have a special structure, I think it is good to make use it to explicitly say that, here we do not want a resolve yet.

CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
build(new CacheLoader<InetSocketAddress, AtomicInteger>() {
@Override public AtomicInteger load(InetSocketAddress key) throws Exception {
build(new CacheLoader<Address, AtomicInteger>() {
@Override public AtomicInteger load(Address key) throws Exception {
return new AtomicInteger(0);
}
});
Expand Down Expand Up @@ -206,7 +207,7 @@ private void cleanupIdleConnections() {
// The connection itself will disconnect if there is no pending call for maxIdleTime.
if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Cleanup idle connection to {}", conn.remoteId().address);
LOG.trace("Cleanup idle connection to {}", conn.remoteId().getAddress());
}
connections.remove(conn.remoteId(), conn);
conn.cleanupConnection();
Expand Down Expand Up @@ -343,11 +344,11 @@ private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcCont
private T getConnection(ConnectionId remoteId) throws IOException {
if (failedServers.isFailedServer(remoteId.getAddress())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not trying to connect to " + remoteId.address
LOG.debug("Not trying to connect to " + remoteId.getAddress()
+ " this server is in the failed servers list");
}
throw new FailedServerException(
"This server is in the failed servers list: " + remoteId.address);
"This server is in the failed servers list: " + remoteId.getAddress());
}
T conn;
synchronized (connections) {
Expand All @@ -365,7 +366,7 @@ private T getConnection(ConnectionId remoteId) throws IOException {
*/
protected abstract T createConnection(ConnectionId remoteId) throws IOException;

private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr,
private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
RpcCallback<Message> callback) {
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
if (metrics != null) {
Expand All @@ -390,8 +391,8 @@ private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress
}

Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
final RpcCallback<Message> callback) {
final Message param, Message returnType, final User ticket,
final InetSocketAddress inetAddr, final RpcCallback<Message> callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change this to use Address directly? And we could also remove the UnknownHostException from the createAddr method then which could makes the createRpcChannel and createBlockingRpcChannel not throw IOException, which will be very good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check.

What we want to avoid is making an ISA for every Call.

Will try this and get back to you.

Copy link
Contributor Author

@apurtell apurtell Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't do this or else there will be a new InetSocketAddress() for every callMethod(), which may cause a DNS lookup per call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done. Lookup done upon first call and then cached there; or else an exception or failure indication is returned at that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can not view the code now but IIRC, on this execution path, we will use a ConnectionId to get a RpcConnection and then use it to send the rpc call? Then I think we could put the actual resolving in the connect method? Before connecting we could always use the Address class to represent the remote address.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the code, I think we could avoid creating an InetSocketAddress everytime here, we just need to change more classes to make use of Address instead of InetSocketAddress, such as ConnetionId, FailedServers, as well as the RpcClient interface. And the resolving of the actual address could be delayed to NettyRpcConnection.connect and BlockingRpcConnection.setupConnection, where we really want to connect to the remote side. And once the connection has been established, and it has not been closed because of error or idle for too long, we do not need to involve InetSocketAddress again. I think it is OK?

final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());

Expand All @@ -405,6 +406,7 @@ Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController
cs.setNumActionsPerServer(numActions);
}

final Address addr = Address.fromSocketAddress(inetAddr);
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
Expand All @@ -429,12 +431,8 @@ public void run(Call call) {
return call;
}

InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
if (addr.isUnresolved()) {
throw new UnknownHostException("can not resolve " + sn.getServerName());
}
return addr;
private static Address createAddr(ServerName sn) {
return Address.fromParts(sn.getHostname(), sn.getPort());
}

/**
Expand All @@ -449,8 +447,8 @@ public void cancelConnections(ServerName sn) {
synchronized (connections) {
for (T connection : connections.values()) {
ConnectionId remoteId = connection.remoteId();
if (remoteId.address.getPort() == sn.getPort()
&& remoteId.address.getHostName().equals(sn.getHostname())) {
if (remoteId.getAddress().getPort() == sn.getPort()
&& remoteId.getAddress().getHostName().equals(sn.getHostname())) {
LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
+ connection.remoteId);
connections.remove(remoteId, connection);
Expand Down Expand Up @@ -509,27 +507,33 @@ public void close() {

@Override
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
int rpcTimeout) throws UnknownHostException {
int rpcTimeout) {
return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
}

@Override
public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
throws UnknownHostException {
public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
}

private static class AbstractRpcChannel {

protected final InetSocketAddress addr;
protected final Address addr;

// We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup
// per method call on the channel. If the remote target is removed or reprovisioned and
// its identity changes a new channel with a newly resolved InetSocketAddress will be
// created as part of retry, so caching here is fine.
// Normally, caching an InetSocketAddress is an anti-pattern.
protected InetSocketAddress isa;

protected final AbstractRpcClient<?> rpcClient;

protected final User ticket;

protected final int rpcTimeout;

protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, Address addr,
User ticket, int rpcTimeout) {
this.addr = addr;
this.rpcClient = rpcClient;
Expand Down Expand Up @@ -566,15 +570,29 @@ public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
implements BlockingRpcChannel {

protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
InetSocketAddress addr, User ticket, int rpcTimeout) {
Address addr, User ticket, int rpcTimeout) {
super(rpcClient, addr, ticket, rpcTimeout);
}

@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException {
// Look up remote address upon first call
if (isa == null) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookups();
}
isa = Address.toSocketAddress(addr);
if (isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
isa = null;
throw new ServiceException(new UnknownHostException(addr + " could not be resolved"));
}
}
return rpcClient.callBlockingMethod(md, configureRpcController(controller),
param, returnType, ticket, addr);
param, returnType, ticket, isa);
}
}

Expand All @@ -584,20 +602,35 @@ public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController
public static class RpcChannelImplementation extends AbstractRpcChannel implements
RpcChannel {

protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
User ticket, int rpcTimeout) throws UnknownHostException {
protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr,
User ticket, int rpcTimeout) {
super(rpcClient, addr, ticket, rpcTimeout);
}

@Override
public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType, RpcCallback<Message> done) {
HBaseRpcController configuredController =
configureRpcController(Preconditions.checkNotNull(controller,
"RpcController can not be null for async rpc call"));
// Look up remote address upon first call
if (isa == null || isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookups();
}
isa = Address.toSocketAddress(addr);
if (isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
isa = null;
controller.setFailed(addr + " could not be resolved");
return;
}
}
// This method does not throw any exceptions, so the caller must provide a
// HBaseRpcController which is used to pass the exceptions.
this.rpcClient.callMethod(md,
configureRpcController(Preconditions.checkNotNull(controller,
"RpcController can not be null for async rpc call")),
param, returnType, ticket, addr, done);
this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done);
}
}
}
Loading