Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3142,6 +3142,7 @@ private void requeueCall(Call call)
throws IOException, InterruptedException {
try {
internalQueueCall(call, false);
rpcMetrics.incrRequeueCalls();
} catch (RpcServerException rse) {
call.doResponse(rse.getCause(), rse.getRpcStatusProto());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public static RpcMetrics create(Server server, Configuration conf) {
MutableCounterLong rpcClientBackoff;
@Metric("Number of Slow RPC calls")
MutableCounterLong rpcSlowCalls;
@Metric("Number of requeue calls")
MutableCounterLong rpcRequeueCalls;

@Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections();
Expand Down Expand Up @@ -304,6 +306,13 @@ public void incrSlowRpc() {
rpcSlowCalls.incr();
}

/**
* Increments the Requeue Calls counter.
*/
public void incrRequeueCalls() {
rpcRequeueCalls.incr();
}

/**
* Returns a MutableRate Counter.
* @return Mutable Rate
Expand Down Expand Up @@ -344,6 +353,15 @@ public long getRpcSlowCalls() {
return rpcSlowCalls.value();
}

/**
* Returns the number of requeue calls.
* @return long
*/
@VisibleForTesting
public long getRpcRequeueCalls() {
return rpcRequeueCalls.value();
}

public MutableRate getDeferredRpcProcessingTime() {
return deferredRpcProcessingTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -36,7 +37,10 @@
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -61,9 +65,12 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.After;
Expand Down Expand Up @@ -124,6 +131,43 @@ public static void shutDownCluster() throws IOException {
}
}

@Test
public void testObserverRequeue() throws Exception {
ScheduledExecutorService interruptor =
Executors.newScheduledThreadPool(1);

FSNamesystem observerFsNS = dfsCluster.getNamesystem(2);
RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster
.getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics();
try {
// Stop EditlogTailer of Observer NameNode.
observerFsNS.getEditLogTailer().stop();
long oldRequeueNum = obRpcMetrics.getRpcRequeueCalls();
ScheduledFuture<FileStatus> scheduledFuture = interruptor.schedule(
() -> {
Path tmpTestPath = new Path("/TestObserverRequeue");
dfs.create(tmpTestPath, (short)1).close();
assertSentTo(0);
// This operation will be blocked in ObserverNameNode
// until EditlogTailer tailed edits from journalNode.
FileStatus fileStatus = dfs.getFileStatus(tmpTestPath);
assertSentTo(2);
return fileStatus;
}, 0, TimeUnit.SECONDS);

GenericTestUtils.waitFor(() -> obRpcMetrics.getRpcRequeueCalls() > oldRequeueNum,
50, 10000);

observerFsNS.getEditLogTailer().doTailEdits();
FileStatus fileStatus = scheduledFuture.get(10000, TimeUnit.MILLISECONDS);
assertNotNull(fileStatus);
} finally {
EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf);
observerFsNS.setEditLogTailerForTests(editLogTailer);
editLogTailer.start();
}
}

@Test
public void testNoActiveToObserver() throws Exception {
try {
Expand Down