Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -41,7 +43,19 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
Expand Down Expand Up @@ -219,6 +233,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
private static final ThreadLocal<UserGroupInformation> CUR_USER =
new ThreadLocal<>();

/** DN type -> full DN report. */
private final LoadingCache<DatanodeReportType, DatanodeInfo[]> dnCache;

/**
* Construct a router RPC server.
*
Expand Down Expand Up @@ -361,6 +378,23 @@ public RouterRpcServer(Configuration configuration, Router router,
this.nnProto = new RouterNamenodeProtocol(this);
this.clientProto = new RouterClientProtocol(conf, this);
this.routerProto = new RouterUserProtocol(this);

long dnCacheExpire = conf.getTimeDuration(
DN_REPORT_CACHE_EXPIRE,
DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS);
this.dnCache = CacheBuilder.newBuilder()
.build(new DatanodeReportCacheLoader());

// Actively refresh the dn cache in a configured interval
Executors
.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(() -> this.dnCache
.asMap()
.keySet()
.parallelStream()
.forEach((key) -> this.dnCache.refresh(key)),
0,
dnCacheExpire, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -868,6 +902,50 @@ public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
return clientProto.getDatanodeReport(type);
}

/**
* Get the datanode report from cache.
*
* @param type Type of the datanode.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
*/
DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type)
throws IOException {
try {
DatanodeInfo[] dns = this.dnCache.get(type);
if (dns == null) {
LOG.debug("Get null DN report from cache");
dns = getCachedDatanodeReportImpl(type);
this.dnCache.put(type, dns);
}
return dns;
} catch (ExecutionException e) {
LOG.error("Cannot get the DN report for {}", type, e);
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
} else {
throw new IOException(cause);
}
}
}

private DatanodeInfo[] getCachedDatanodeReportImpl
(final DatanodeReportType type) throws IOException {
// We need to get the DNs as a privileged user
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
RouterRpcServer.setCurrentUser(loginUser);

try {
DatanodeInfo[] dns = clientProto.getDatanodeReport(type);
LOG.debug("Refresh cached DN report with {} datanodes", dns.length);
return dns;
} finally {
// Reset ugi to remote user for remaining operations.
RouterRpcServer.resetCurrentUser();
}
}

/**
* Get the datanode report with a timeout.
* @param type Type of the datanode.
Expand Down Expand Up @@ -1748,4 +1826,45 @@ public void refreshSuperUserGroupsConfiguration() throws IOException {
public String[] getGroupsForUser(String user) throws IOException {
return routerProto.getGroupsForUser(user);
}
}

/**
* Deals with loading datanode report into the cache and refresh.
*/
private class DatanodeReportCacheLoader
extends CacheLoader<DatanodeReportType, DatanodeInfo[]> {

private ListeningExecutorService executorService;

DatanodeReportCacheLoader() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("DatanodeReport-Cache-Reload")
.setDaemon(true)
.build();

executorService = MoreExecutors.listeningDecorator(
Executors.newSingleThreadExecutor(threadFactory));
}

@Override
public DatanodeInfo[] load(DatanodeReportType type) throws Exception {
return getCachedDatanodeReportImpl(type);
}

/**
* Override the reload method to provide an asynchronous implementation,
* so that the query will not be slowed down by the cache refresh. It
* will return the old cache value and schedule a background refresh.
*/
@Override
public ListenableFuture<DatanodeInfo[]> reload(
final DatanodeReportType type, DatanodeInfo[] oldValue)
throws Exception {
return executorService.submit(new Callable<DatanodeInfo[]>() {
@Override
public DatanodeInfo[] call() throws Exception {
return load(type);
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -454,19 +454,12 @@ private URI redirectURI(final Router router, final UserGroupInformation ugi,
private DatanodeInfo chooseDatanode(final Router router,
final String path, final HttpOpParam.Op op, final long openOffset,
final String excludeDatanodes) throws IOException {
// We need to get the DNs as a privileged user
final RouterRpcServer rpcServer = getRPCServer(router);
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
RouterRpcServer.setCurrentUser(loginUser);

DatanodeInfo[] dns = null;
try {
dns = rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
dns = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
} catch (IOException e) {
LOG.error("Cannot get the datanodes from the RPC server", e);
} finally {
// Reset ugi to remote user for remaining operations.
RouterRpcServer.resetCurrentUser();
}

HashSet<Node> excludes = new HashSet<Node>();
Expand Down