Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import static org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause compilation error. Also I'm wondering whether it makes sense to move the DN cache logic from NamenodeBeanMetrics to here and have the former to depend on this. This way we don't have to keep two copies of cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. I was working on an old branch, now it is fixed. I prefer to move the NamenodeBeanMetrics logic by a separate change. Let's make this change cohesive. NamenodeBeanMetrics cached report needs a different API or at least some extra work since it needs a String.

import static org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics.DN_REPORT_CACHE_EXPIRE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT;
Expand All @@ -41,7 +43,21 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
Expand Down Expand Up @@ -219,6 +235,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
private static final ThreadLocal<UserGroupInformation> CUR_USER =
new ThreadLocal<>();

/** DN type -> full DN report. */
private final LoadingCache<DatanodeReportType, DatanodeInfo[]> dnCache;

/**
* Construct a router RPC server.
*
Expand Down Expand Up @@ -361,6 +380,23 @@ public RouterRpcServer(Configuration configuration, Router router,
this.nnProto = new RouterNamenodeProtocol(this);
this.clientProto = new RouterClientProtocol(conf, this);
this.routerProto = new RouterUserProtocol(this);

long dnCacheExpire = conf.getTimeDuration(
DN_REPORT_CACHE_EXPIRE,
DN_REPORT_CACHE_EXPIRE_DEFAULT, TimeUnit.MILLISECONDS);
this.dnCache = CacheBuilder.newBuilder()
.build(new DatanodeReportCacheLoader());

// Actively refresh the dn cache in a configured interval
Executors
.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(() -> this.dnCache
.asMap()
.keySet()
.parallelStream()
.forEach((key) -> this.dnCache.refresh(key)),
0,
dnCacheExpire, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -868,6 +904,50 @@ public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
return clientProto.getDatanodeReport(type);
}

/**
* Get the datanode report from cache.
*
* @param type Type of the datanode.
* @return List of datanodes.
* @throws IOException If it cannot get the report.
*/
public DatanodeInfo[] getCachedDatanodeReport(DatanodeReportType type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be package-private?

throws IOException {
try {
DatanodeInfo[] dns = this.dnCache.get(type);
if (dns == null) {
LOG.debug("Get null DN report from cache");
dns = getCachedDatanodeReportImpl(type);
this.dnCache.put(type, dns);
}
return dns;
} catch (ExecutionException e) {
LOG.error("Cannot get the DN report for {}", type, e);
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
} else {
throw new IOException(cause);
}
}
}

private DatanodeInfo[] getCachedDatanodeReportImpl
(final DatanodeReportType type) throws IOException{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after {

// We need to get the DNs as a privileged user
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
RouterRpcServer.setCurrentUser(loginUser);

try {
DatanodeInfo[] dns = clientProto.getDatanodeReport(type);
LOG.debug("Refresh cached DN report with {} datanodes", dns.length);
return dns;
} finally {
// Reset ugi to remote user for remaining operations.
RouterRpcServer.resetCurrentUser();
}
}

/**
* Get the datanode report with a timeout.
* @param type Type of the datanode.
Expand Down Expand Up @@ -1748,4 +1828,58 @@ public void refreshSuperUserGroupsConfiguration() throws IOException {
public String[] getGroupsForUser(String user) throws IOException {
return routerProto.getGroupsForUser(user);
}
}

/**
* Deals with loading datanode report into the cache and refresh.
*/
private class DatanodeReportCacheLoader
extends CacheLoader<DatanodeReportType, DatanodeInfo[]> {

private ListeningExecutorService executorService;

DatanodeReportCacheLoader() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("DatanodeReport-Cache-Reload")
.setDaemon(true)
.build();

// Only use 1 thread to refresh cache.
// With coreThreadCount == maxThreadCount we effectively
// create a fixed size thread pool. As allowCoreThreadTimeOut
// has been set, all threads will die after 60 seconds of non use.
ThreadPoolExecutor parentExecutor = new ThreadPoolExecutor(
1,
1,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
parentExecutor.allowCoreThreadTimeOut(true);
executorService = MoreExecutors.listeningDecorator(parentExecutor);
}

@Override
public DatanodeInfo[] load(DatanodeReportType type) throws Exception {
return getCachedDatanodeReportImpl(type);
}

/**
* Override the reload method to provide an asynchronous implementation,
* so that the query will not be slowed down by the cache refresh. It
* will return the old cache value and schedule a background refresh.
*/
@Override
public ListenableFuture<DatanodeInfo[]> reload(
final DatanodeReportType type, DatanodeInfo[] oldValue)
throws Exception {
ListenableFuture<DatanodeInfo[]> listenableFuture =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: variable listenableFuture is redundant - you can just return from submit call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comments. I've addressed all of them.

executorService.submit(new Callable<DatanodeInfo[]>() {
@Override
public DatanodeInfo[] call() throws Exception {
return load(type);
}
});
return listenableFuture;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -454,19 +454,12 @@ private URI redirectURI(final Router router, final UserGroupInformation ugi,
private DatanodeInfo chooseDatanode(final Router router,
final String path, final HttpOpParam.Op op, final long openOffset,
final String excludeDatanodes) throws IOException {
// We need to get the DNs as a privileged user
final RouterRpcServer rpcServer = getRPCServer(router);
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
RouterRpcServer.setCurrentUser(loginUser);

DatanodeInfo[] dns = null;
try {
dns = rpcServer.getDatanodeReport(DatanodeReportType.LIVE);
dns = rpcServer.getCachedDatanodeReport(DatanodeReportType.LIVE);
} catch (IOException e) {
LOG.error("Cannot get the datanodes from the RPC server", e);
} finally {
// Reset ugi to remote user for remaining operations.
RouterRpcServer.resetCurrentUser();
}

HashSet<Node> excludes = new HashSet<Node>();
Expand Down