Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,10 @@ public interface FederationRPCMBean {
* @return Number of operations rejected due to lack of permits.
*/
long getProxyOpPermitRejected();

/**
* Get the number of operations rejected due to lack of permits of each namespace.
* @return Number of operations rejected due to lack of permits of each namespace.
*/
String getProxyOpPermitRejectedPerNs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,9 @@ public void incrProxyOpPermitRejected() {
public long getProxyOpPermitRejected() {
return proxyOpPermitRejected.value();
}

@Override
public String getProxyOpPermitRejectedPerNs() {
return rpcServer.getRPCClient().getRejectedPermitsPerNsJSON();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -134,6 +136,7 @@ public class RouterRpcClient {

/** Fairness manager to control handlers assigned per NS. */
private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
private Map<String, LongAdder> rejectedPermitsPerNs = new ConcurrentHashMap<>();

/**
* Create a router RPC client to manage remote procedure calls to NNs.
Expand Down Expand Up @@ -319,6 +322,15 @@ public String getAsyncCallerPoolJson() {
return JSON.toString(info);
}

/**
* JSON representation of the rejected permits for each nameservice.
*
* @return String representation of the rejected permits for each nameservice.
*/
public String getRejectedPermitsPerNsJSON() {
return JSON.toString(rejectedPermitsPerNs);
}

/**
* Get ClientProtocol proxy client for a NameNode. Each combination of user +
* NN must use a unique proxy client. Previously created clients are cached
Expand Down Expand Up @@ -1544,6 +1556,7 @@ private void acquirePermit(
if (rpcMonitor != null) {
rpcMonitor.getRPCMetrics().incrProxyOpPermitRejected();
}
incrRejectedPermitForNs(nsId);
LOG.debug("Permit denied for ugi: {} for method: {}",
ugi, m.getMethodName());
String msg =
Expand Down Expand Up @@ -1576,4 +1589,13 @@ private void releasePermit(
return (AbstractRouterRpcFairnessPolicyController
)routerRpcFairnessPolicyController;
}

private void incrRejectedPermitForNs(String ns) {
rejectedPermitsPerNs.computeIfAbsent(ns, k -> new LongAdder()).increment();
}

public Long getRejectedPermitForNs(String ns) {
return rejectedPermitsPerNs.containsKey(ns) ?
rejectedPermitsPerNs.get(ns).longValue() : 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
Expand Down Expand Up @@ -267,6 +269,14 @@ public DFSClient getClient() throws IOException, URISyntaxException {
public Configuration getConf() {
return conf;
}

public RouterRpcServer getRouterRpcServer() {
return router.getRpcServer();
}

public RouterRpcClient getRouterRpcClient() {
return getRouterRpcServer().getRPCClient();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ private void startLoadTest(final boolean isConcurrent, final boolean fairness)
Configuration conf = new HdfsConfiguration();
final int numOps = 10;
final AtomicInteger overloadException = new AtomicInteger();
int originalRejectedPermits = getTotalRejectedPermits(routerContext);

for (int i = 0; i < numOps; i++) {
DFSClient routerClient = null;
Expand Down Expand Up @@ -177,6 +178,9 @@ private void startLoadTest(final boolean isConcurrent, final boolean fairness)
}
overloadException.get();
}
int latestRejectedPermits = getTotalRejectedPermits(routerContext);
assertEquals(latestRejectedPermits - originalRejectedPermits,
overloadException.get());

if (fairness) {
assertTrue(overloadException.get() > 0);
Expand Down Expand Up @@ -208,4 +212,14 @@ private void invokeConcurrent(ClientProtocol routerProto, String clientName)
routerProto.renewLease(clientName);
}

private int getTotalRejectedPermits(RouterContext routerContext) {
int totalRejectedPermits = 0;
for (String ns : cluster.getNameservices()) {
totalRejectedPermits += routerContext.getRouterRpcClient()
.getRejectedPermitForNs(ns);
}
totalRejectedPermits += routerContext.getRouterRpcClient()
.getRejectedPermitForNs(RouterRpcFairnessConstants.CONCURRENT_NS);
return totalRejectedPermits;
}
}