diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index aab18b1bc5c0..460655ba39a1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container.replication; import com.google.common.annotations.VisibleForTesting; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.Config; @@ -502,6 +503,7 @@ public void sendThrottledDeleteCommand(final ContainerInfo container, int commandCount = nodeManager.getTotalDatanodeCommandCount(datanode, Type.deleteContainerCommand); if (commandCount >= datanodeDeleteLimit) { + metrics.incrDeleteContainerCmdsDeferredTotal(); throw new CommandTargetOverloadedException("Cannot schedule a delete " + "container command for container " + container.containerID() + " on datanode " + datanode + " as it has too many pending delete " + @@ -533,6 +535,7 @@ public void sendThrottledReplicationCommand(ContainerInfo containerInfo, List> sourceWithCmds = getAvailableDatanodesForReplication(sources); if (sourceWithCmds.isEmpty()) { + metrics.incrReplicateContainerCmdsDeferredTotal(); throw new CommandTargetOverloadedException("No sources with capacity " + "available for replication of container " + containerID + " to " + target); @@ -553,6 +556,7 @@ public void sendThrottledReconstructionCommand(ContainerInfo containerInfo, List> targetWithCmds = getAvailableDatanodesForReplication(targets); if (targetWithCmds.isEmpty()) { + metrics.incrECReconstructionCmdsDeferredTotal(); throw new CommandTargetOverloadedException("No target with capacity " + "available for reconstruction of " + containerInfo.getContainerID()); } @@ -1407,6 +1411,7 @@ public String getServiceName() { return ReplicationManager.class.getSimpleName(); } + @SuppressFBWarnings("IS2_INCONSISTENT_SYNC") public ReplicationManagerMetrics getMetrics() { return metrics; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java index 40922c84af85..9c62761a61ab 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java @@ -171,6 +171,19 @@ public final class ReplicationManagerMetrics implements MetricsSource { @Metric("Number of EC replicas scheduled for delete which timed out.") private MutableCounterLong ecReplicaDeleteTimeoutTotal; + @Metric("NUmber of Reconstruct EC Container commands that could not be sent " + + "due to the pending commands on the target datanode") + private MutableCounterLong ecReconstructionCmdsDeferredTotal; + + @Metric("Number of delete container commands that could not be sent due " + + "to the pending commands on the target datanode") + private MutableCounterLong deleteContainerCmdsDeferredTotal; + + @Metric("Number of replicate container commands that could not be sent due " + + "to the pending commands on all source datanodes") + private MutableCounterLong replicateContainerCmdsDeferredTotal; + + public ReplicationManagerMetrics(ReplicationManager manager) { this.registry = new MetricsRegistry(METRICS_SOURCE_NAME); this.replicationManager = manager; @@ -223,6 +236,9 @@ public void getMetrics(MetricsCollector collector, boolean all) { ecReconstructionCmdsSentTotal.snapshot(builder, all); ecReplicaCreateTimeoutTotal.snapshot(builder, all); ecReplicasDeletedTotal.snapshot(builder, all); + ecReconstructionCmdsDeferredTotal.snapshot(builder, all); + deleteContainerCmdsDeferredTotal.snapshot(builder, all); + replicateContainerCmdsDeferredTotal.snapshot(builder, all); } public void unRegister() { @@ -372,6 +388,18 @@ public void incrEcReconstructionCmdsSentTotal() { this.ecReconstructionCmdsSentTotal.incr(); } + public void incrECReconstructionCmdsDeferredTotal() { + this.ecReconstructionCmdsDeferredTotal.incr(); + } + + public void incrDeleteContainerCmdsDeferredTotal() { + this.deleteContainerCmdsDeferredTotal.incr(); + } + + public void incrReplicateContainerCmdsDeferredTotal() { + this.replicateContainerCmdsDeferredTotal.incr(); + } + public long getEcReplication() { return replicationManager.getContainerReplicaPendingOps() .getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD); @@ -417,4 +445,17 @@ public long getEcReplicasCreatedTotal() { public long getEcReplicasDeletedTotal() { return ecReplicasDeletedTotal.value(); } + + public long getEcReconstructionCmdsDeferredTotal() { + return ecReconstructionCmdsDeferredTotal.value(); + } + + public long getDeleteContainerCmdsDeferredTotal() { + return deleteContainerCmdsDeferredTotal.value(); + } + + public long getReplicateContainerCmdsDeferredTotal() { + return replicateContainerCmdsDeferredTotal.value(); + } + } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index e3311edfe705..d9c5e2f60c01 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -82,6 +82,7 @@ import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas; import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicasWithSameOrigin; import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.getNoNodesTestPlacementPolicy; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -193,18 +194,18 @@ public void testPendingOpsClearedWhenStarting() { MockDatanodeDetails.randomDatanodeDetails(), 1, Integer.MAX_VALUE); containerReplicaPendingOps.scheduleDeleteReplica(ContainerID.valueOf(2), MockDatanodeDetails.randomDatanodeDetails(), 1, Integer.MAX_VALUE); - Assert.assertEquals(1, containerReplicaPendingOps + assertEquals(1, containerReplicaPendingOps .getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD)); - Assert.assertEquals(1, containerReplicaPendingOps + assertEquals(1, containerReplicaPendingOps .getPendingOpCount(ContainerReplicaOp.PendingOpType.DELETE)); // Registers against serviceManager and notifies the status has changed. enableProcessAll(); // Pending ops should be cleared. - Assert.assertEquals(0, containerReplicaPendingOps + assertEquals(0, containerReplicaPendingOps .getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD)); - Assert.assertEquals(0, containerReplicaPendingOps + assertEquals(0, containerReplicaPendingOps .getPendingOpCount(ContainerReplicaOp.PendingOpType.DELETE)); } @@ -216,8 +217,8 @@ public void testOpenContainerSkipped() throws ContainerNotFoundException { addReplicas(container, ContainerReplicaProto.State.OPEN, 1, 2, 3, 4); replicationManager.processContainer( container, repQueue, repReport); - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); } @Test @@ -231,10 +232,10 @@ public void testUnhealthyOpenContainerClosed() container, repQueue, repReport); Mockito.verify(eventPublisher, Mockito.times(1)) .fireEvent(SCMEvents.CLOSE_CONTAINER, container.containerID()); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.OPEN_UNHEALTHY)); - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); } @Test @@ -256,14 +257,14 @@ public void misMatchedReplicasOfRatisContainerShouldBeClosed() Mockito.verify(nodeManager, Mockito.times(3)) .addDatanodeCommand(any(), any()); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.OVER_REPLICATED)); - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.underReplicatedQueueSize()); /* Though over replicated, this container should not be added to over replicated queue until all replicas are closed. */ - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); } @Test @@ -289,10 +290,10 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica() Mockito.verify(nodeManager, Mockito.times(1)) .addDatanodeCommand(any(), any()); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - Assert.assertEquals(1, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(1, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); } @Test @@ -311,12 +312,12 @@ public void testUnderReplicatedClosedContainerWithOnlyUnhealthyReplicas() replicas.add(unhealthyOnDecommissioning); replicationManager.processContainer(container, repQueue, repReport); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.UNHEALTHY)); - Assert.assertEquals(1, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(1, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); } /** @@ -343,12 +344,12 @@ public void testQuasiClosedContainerWithExcessUnhealthyReplica() storeContainerAndReplicas(container, replicas); replicationManager.processContainer(container, repQueue, repReport); - Assert.assertEquals(0, repReport.getStat( + assertEquals(0, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.OVER_REPLICATED)); - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(1, repQueue.overReplicatedQueueSize()); + assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(1, repQueue.overReplicatedQueueSize()); RatisOverReplicationHandler handler = new RatisOverReplicationHandler( ratisPlacementPolicy, replicationManager); @@ -358,9 +359,9 @@ public void testQuasiClosedContainerWithExcessUnhealthyReplica() handler.processAndSendCommands(replicas, Collections.emptyList(), repQueue.dequeueOverReplicatedContainer(), 2); Assert.assertTrue(commandsSent.iterator().hasNext()); - Assert.assertEquals(unhealthy.getDatanodeDetails().getUuid(), + assertEquals(unhealthy.getDatanodeDetails().getUuid(), commandsSent.iterator().next().getKey()); - Assert.assertEquals(SCMCommandProto.Type.deleteContainerCommand, + assertEquals(SCMCommandProto.Type.deleteContainerCommand, commandsSent.iterator().next().getValue().getType()); } @@ -382,12 +383,12 @@ public void testQuasiClosedContainerWithUnhealthyReplicaOnUniqueOrigin() storeContainerAndReplicas(container, replicas); replicationManager.processContainer(container, repQueue, repReport); - Assert.assertEquals(0, repReport.getStat( + assertEquals(0, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.OVER_REPLICATED)); - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(1, repQueue.overReplicatedQueueSize()); + assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(1, repQueue.overReplicatedQueueSize()); RatisOverReplicationHandler handler = new RatisOverReplicationHandler( ratisPlacementPolicy, replicationManager); @@ -401,7 +402,7 @@ public void testQuasiClosedContainerWithUnhealthyReplicaOnUniqueOrigin() // unhealthy replica can't be deleted because it has a unique origin DN Assert.assertNotEquals(unhealthy.getDatanodeDetails().getUuid(), commandsSent.iterator().next().getKey()); - Assert.assertEquals(SCMCommandProto.Type.deleteContainerCommand, + assertEquals(SCMCommandProto.Type.deleteContainerCommand, commandsSent.iterator().next().getValue().getType()); } @@ -413,8 +414,8 @@ public void testHealthyContainer() throws ContainerNotFoundException { replicationManager.processContainer( container, repQueue, repReport); - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); } @Test @@ -425,9 +426,9 @@ public void testUnderReplicatedContainer() throws ContainerNotFoundException { replicationManager.processContainer( container, repQueue, repReport); - Assert.assertEquals(1, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); } @@ -445,7 +446,7 @@ public void testGetContainerReplicationHealthForUnderReplicatedContainer() { ContainerHealthResult result = replicationManager.getContainerReplicationHealth(container, replicas); - Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED, + assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED, result.getHealthState()); // Test the same for a RATIS container @@ -458,7 +459,7 @@ public void testGetContainerReplicationHealthForUnderReplicatedContainer() { result = replicationManager.getContainerReplicationHealth(container, replicas); - Assert.assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED, + assertEquals(ContainerHealthResult.HealthState.UNDER_REPLICATED, result.getHealthState()); } @@ -476,12 +477,12 @@ public void testUnderReplicatedContainerFixedByPending() container, repQueue, repReport); // As the pending replication fixes the under replication, nothing is added // to the under replication list. - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); // As the container is still under replicated, as the pending have not // completed yet, the container is still marked as under-replicated in the // report. - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); } @@ -496,11 +497,11 @@ public void testUnderReplicatedAndUnrecoverable() container, repQueue, repReport); // If it is unrecoverable, there is no point in putting it into the under // replication list. It will be checked again on the next RM run. - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); - Assert.assertEquals(0, repReport.getStat( + assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(0, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.MISSING)); } @@ -519,15 +520,15 @@ public void testUnrecoverableAndEmpty() replicationManager.processContainer(container, repQueue, repReport); // If it is unrecoverable, there is no point in putting it into the under // replication list. It will be checked again on the next RM run. - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); - Assert.assertEquals(0, repReport.getStat( + assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(0, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - Assert.assertEquals(0, repReport.getStat( + assertEquals(0, repReport.getStat( ReplicationManagerReport.HealthState.MISSING)); // As it is marked empty in the report, it must have gone through the // empty container handler, indicating is was handled as empty. - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.EMPTY)); } @@ -555,9 +556,9 @@ public void testUnderReplicatedClosedContainerWithUnhealthyReplicas() replicationManager.processContainer( container, repQueue, repReport); - Assert.assertEquals(1, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); } @@ -586,11 +587,11 @@ public void testUnrecoverableClosedContainerWithUnhealthyReplicas() replicationManager.processContainer( container, repQueue, repReport); - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); - Assert.assertEquals(0, repReport.getStat( + assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(0, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.MISSING)); } @@ -614,9 +615,9 @@ public void testUnrecoverableClosedContainerWithUnhealthyReplicas() replicationManager.processContainer( container, repQueue, repReport); - Assert.assertEquals(1, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); } @@ -632,11 +633,11 @@ public void testUnderAndOverReplicated() // If it is both under and over replicated, we set it to the most important // state, which is under-replicated. When that is fixed, over replication // will be handled. - Assert.assertEquals(1, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - Assert.assertEquals(0, repReport.getStat( + assertEquals(0, repReport.getStat( ReplicationManagerReport.HealthState.OVER_REPLICATED)); } @@ -668,11 +669,11 @@ public void testUnderReplicationBlockedByUnhealthyReplicas() // assert that this container is seen as under replicated replicationManager.processContainer(container, repQueue, repReport); - Assert.assertEquals(1, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); - Assert.assertEquals(1, repReport.getStat( + assertEquals(1, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - Assert.assertEquals(0, repReport.getStat( + assertEquals(0, repReport.getStat( ReplicationManagerReport.HealthState.OVER_REPLICATED)); // now, pass this container to ec under replication handling @@ -689,17 +690,17 @@ public void testUnderReplicationBlockedByUnhealthyReplicas() repQueue.dequeueUnderReplicatedContainer(), 1)); // a delete command should also have been sent for UNHEALTHY replica of // index 1 - Assert.assertEquals(1, commandsSent.size()); + assertEquals(1, commandsSent.size()); Pair> command = commandsSent.iterator().next(); Assertions.assertEquals(SCMCommandProto.Type.deleteContainerCommand, command.getValue().getType()); DeleteContainerCommand deleteCommand = (DeleteContainerCommand) command.getValue(); - Assert.assertEquals(unhealthyReplica1.getDatanodeDetails().getUuid(), + assertEquals(unhealthyReplica1.getDatanodeDetails().getUuid(), command.getKey()); - Assert.assertEquals(container.containerID(), + assertEquals(container.containerID(), ContainerID.valueOf(deleteCommand.getContainerID())); - Assert.assertEquals(unhealthyReplica1.getReplicaIndex(), + assertEquals(unhealthyReplica1.getReplicaIndex(), deleteCommand.getReplicaIndex()); } @@ -711,9 +712,9 @@ public void testOverReplicated() throws ContainerNotFoundException { 1, 2, 3, 4, 5, 5); replicationManager.processContainer( container, repQueue, repReport); - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); - Assert.assertEquals(1, repQueue.overReplicatedQueueSize()); - Assert.assertEquals(1, repReport.getStat( + assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(1, repQueue.overReplicatedQueueSize()); + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.OVER_REPLICATED)); } @@ -729,11 +730,11 @@ public void testOverReplicatedFixByPending() clock.millis() + 10000); replicationManager.processContainer( container, repQueue, repReport); - Assert.assertEquals(0, repQueue.underReplicatedQueueSize()); + assertEquals(0, repQueue.underReplicatedQueueSize()); // If the pending replication fixes the over-replication, nothing is added // to the over replication list. - Assert.assertEquals(0, repQueue.overReplicatedQueueSize()); - Assert.assertEquals(1, repReport.getStat( + assertEquals(0, repQueue.overReplicatedQueueSize()); + assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.OVER_REPLICATED)); } @@ -772,7 +773,7 @@ public void testUnderReplicationQueuePopulated() { // Get the first message off the queue - it should be underRep0. ContainerHealthResult.UnderReplicatedHealthResult res = queue.dequeueUnderReplicatedContainer(); - Assert.assertEquals(underRep0, res.getContainerInfo()); + assertEquals(underRep0, res.getContainerInfo()); // Now requeue it queue.enqueue(res); @@ -782,7 +783,7 @@ public void testUnderReplicationQueuePopulated() { // and 1 retry. They will have the same weighted redundancy so lesser // retries should come first res = queue.dequeueUnderReplicatedContainer(); - Assert.assertEquals(underRep1, res.getContainerInfo()); + assertEquals(underRep1, res.getContainerInfo()); // Next message is underRep0. It starts with a weighted redundancy of 0 + 1 // retry. The other message on the queue is a decommission only with a @@ -791,18 +792,18 @@ public void testUnderReplicationQueuePopulated() { // one will be next due to having less retries. for (int i = 0; i < 4; i++) { res = queue.dequeueUnderReplicatedContainer(); - Assert.assertEquals(underRep0, res.getContainerInfo()); + assertEquals(underRep0, res.getContainerInfo()); queue.enqueue(res); } res = queue.dequeueUnderReplicatedContainer(); - Assert.assertEquals(decomContainer, res.getContainerInfo()); + assertEquals(decomContainer, res.getContainerInfo()); res = queue.dequeueUnderReplicatedContainer(); - Assert.assertEquals(underRep0, res.getContainerInfo()); + assertEquals(underRep0, res.getContainerInfo()); // Next is the mis-rep container, which has a remaining redundancy of 6. res = queue.dequeueUnderReplicatedContainer(); - Assert.assertEquals(misRep, res.getContainerInfo()); + assertEquals(misRep, res.getContainerInfo()); res = queue.dequeueUnderReplicatedContainer(); Assert.assertNull(res); @@ -939,7 +940,7 @@ public void testSendDatanodeReplicateCommand() throws NotLeaderException { .getObject(ReplicationManager.ReplicationManagerConfiguration.class); long expectedDeadline = clock.millis() + rmConf.getEventTimeout() - rmConf.getDatanodeTimeoutOffset(); - Assert.assertEquals(expectedDeadline, command.getDeadline()); + assertEquals(expectedDeadline, command.getDeadline()); List ops = containerReplicaPendingOps.getPendingOps( containerInfo.containerID()); @@ -1005,7 +1006,7 @@ public void testReplicateContainerCommandToTarget() .getObject(ReplicationManager.ReplicationManagerConfiguration.class); long expectedDeadline = clock.millis() + rmConf.getEventTimeout() - rmConf.getDatanodeTimeoutOffset(); - Assert.assertEquals(expectedDeadline, command.getDeadline()); + assertEquals(expectedDeadline, command.getDeadline()); List ops = containerReplicaPendingOps.getPendingOps( containerInfo.containerID()); @@ -1067,6 +1068,8 @@ public void testSendThrottledReplicateContainerCommand() testReplicationCommand(cmdTarget, sourceNodes.keySet(), 0, MockDatanodeDetails.randomDatanodeDetails()); + assertEquals(0, replicationManager.getMetrics() + .getReplicateContainerCmdsDeferredTotal()); } @Test @@ -1112,10 +1115,9 @@ private void testReplicationCommand( Assertions.assertEquals(replicaIndex, cmd.getReplicaIndex()); } - @Test(expected = CommandTargetOverloadedException.class) + @Test public void testSendThrottledReplicateContainerCommandThrowsWhenNoSources() - throws CommandTargetOverloadedException, NodeNotFoundException, - NotLeaderException { + throws NodeNotFoundException { // Reconstruction commands also count toward the limit, so set things up // so that the nodes are at the limit caused by 1 reconstruction command // and the remaining replication commands @@ -1135,8 +1137,14 @@ public void testSendThrottledReplicateContainerCommandThrowsWhenNoSources() DatanodeDetails destination = MockDatanodeDetails.randomDatanodeDetails(); ContainerInfo container = ReplicationTestUtil.createContainerInfo( repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20); - replicationManager.sendThrottledReplicationCommand( - container, sourceNodes, destination, 0); + + long overLoadedCount = replicationManager.getMetrics() + .getReplicateContainerCmdsDeferredTotal(); + assertThrows(CommandTargetOverloadedException.class, + () -> replicationManager.sendThrottledReplicationCommand( + container, sourceNodes, destination, 0)); + assertEquals(overLoadedCount + 1, replicationManager.getMetrics() + .getReplicateContainerCmdsDeferredTotal()); } @Test @@ -1161,12 +1169,13 @@ public void testSendThrottledReconstructionCommand() Assertions.assertEquals(1, commandsSent.size()); Pair> cmd = commandsSent.iterator().next(); Assertions.assertEquals(cmdTarget.getUuid(), cmd.getLeft()); + assertEquals(0, replicationManager.getMetrics() + .getEcReconstructionCmdsDeferredTotal()); } - @Test(expected = CommandTargetOverloadedException.class) + @Test public void testSendThrottledReconstructionCommandThrowsWhenNoTargets() - throws CommandTargetOverloadedException, NodeNotFoundException, - NotLeaderException { + throws NodeNotFoundException { int limit = replicationManager.getConfig().getDatanodeReplicationLimit(); int reconstructionWeight = replicationManager.getConfig() .getReconstructionCommandWeight(); @@ -1185,7 +1194,13 @@ public void testSendThrottledReconstructionCommandThrowsWhenNoTargets() ReconstructECContainersCommand command = createReconstructionCommand( container, MockDatanodeDetails.randomDatanodeDetails(), MockDatanodeDetails.randomDatanodeDetails()); - replicationManager.sendThrottledReconstructionCommand(container, command); + long overLoadedCount = replicationManager.getMetrics() + .getEcReconstructionCmdsDeferredTotal(); + assertThrows(CommandTargetOverloadedException.class, + () -> replicationManager.sendThrottledReconstructionCommand( + container, command)); + assertEquals(overLoadedCount + 1, replicationManager.getMetrics() + .getEcReconstructionCmdsDeferredTotal()); } private ReconstructECContainersCommand createReconstructionCommand( @@ -1216,13 +1231,14 @@ public void testCreateThrottledDeleteContainerCommand() ContainerInfo container = ReplicationTestUtil.createContainerInfo( repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20); replicationManager.sendThrottledDeleteCommand(container, 1, target, true); - Assert.assertEquals(commandsSent.size(), 1); + assertEquals(commandsSent.size(), 1); + assertEquals(0, replicationManager.getMetrics() + .getDeleteContainerCmdsDeferredTotal()); } - @Test(expected = CommandTargetOverloadedException.class) + @Test public void testCreateThrottledDeleteContainerCommandThrowsWhenNoSources() - throws CommandTargetOverloadedException, NodeNotFoundException, - NotLeaderException { + throws NodeNotFoundException { int limit = replicationManager.getConfig().getDatanodeDeleteLimit(); Mockito.when(nodeManager.getTotalDatanodeCommandCount(any(), @@ -1232,7 +1248,13 @@ public void testCreateThrottledDeleteContainerCommandThrowsWhenNoSources() DatanodeDetails target = MockDatanodeDetails.randomDatanodeDetails(); ContainerInfo container = ReplicationTestUtil.createContainerInfo( repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20); - replicationManager.sendThrottledDeleteCommand(container, 1, target, true); + long overLoadedCount = replicationManager.getMetrics() + .getDeleteContainerCmdsDeferredTotal(); + assertThrows(CommandTargetOverloadedException.class, + () -> replicationManager.sendThrottledDeleteCommand( + container, 1, target, true)); + assertEquals(overLoadedCount + 1, replicationManager.getMetrics() + .getDeleteContainerCmdsDeferredTotal()); } @Test @@ -1259,7 +1281,7 @@ public void testExcludedNodes() throws NodeNotFoundException, MockDatanodeDetails.randomDatanodeDetails(), 1); Set excluded = replicationManager.getExcludedNodes(); - Assert.assertEquals(1, excluded.size()); + assertEquals(1, excluded.size()); // dn 3 was at the limit already, so should be added when filtering the // nodes Assert.assertTrue(excluded.contains(dn3)); @@ -1267,13 +1289,13 @@ public void testExcludedNodes() throws NodeNotFoundException, // Trigger an update for dn3, but it should stay in the excluded list as its // count is still at the limit. replicationManager.datanodeCommandCountUpdated(dn3); - Assert.assertEquals(replicationManager.getExcludedNodes().size(), 1); + assertEquals(replicationManager.getExcludedNodes().size(), 1); // Starting maintenance on dn3 increases its limits, so it should no longer // be excluded dn3.setPersistedOpState(ENTERING_MAINTENANCE); replicationManager.datanodeCommandCountUpdated(dn3); - Assert.assertEquals(0, replicationManager.getExcludedNodes().size()); + assertEquals(0, replicationManager.getExcludedNodes().size()); // now sent a reconstruction command. It should be sent to dn2, which is // at the lowest count, but this command should push it to the limit and @@ -1282,19 +1304,19 @@ public void testExcludedNodes() throws NodeNotFoundException, container, dn1, dn2); replicationManager.sendThrottledReconstructionCommand(container, command); excluded = replicationManager.getExcludedNodes(); - Assert.assertEquals(1, excluded.size()); + assertEquals(1, excluded.size()); // dn 2 reached the limit from the reconstruction command Assert.assertTrue(excluded.contains(dn2)); // Update received for DN2, it should be cleared from the excluded list. replicationManager.datanodeCommandCountUpdated(dn2); excluded = replicationManager.getExcludedNodes(); - Assert.assertEquals(0, excluded.size()); + assertEquals(0, excluded.size()); // Finally, update received for DN1 - it is not excluded and should not // be added or cause any problems by not being there replicationManager.datanodeCommandCountUpdated(dn1); - Assert.assertEquals(0, excluded.size()); + assertEquals(0, excluded.size()); } @Test