diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 4ab69a0b7edf..69f0c1d2c802 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.StringJoiner; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -41,6 +40,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -57,6 +57,7 @@ StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerMetrics; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.ha.SCMContext; import org.apache.hadoop.hdds.scm.ha.SCMService; @@ -66,10 +67,6 @@ import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.metrics2.MetricsCollector; -import org.apache.hadoop.metrics2.MetricsInfo; -import org.apache.hadoop.metrics2.MetricsSource; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; @@ -92,7 +89,7 @@ * that the containers are properly replicated. Replication Manager deals only * with Quasi Closed / Closed container. */ -public class ReplicationManager implements MetricsSource, SCMService { +public class ReplicationManager implements SCMService { public static final Logger LOG = LoggerFactory.getLogger(ReplicationManager.class); @@ -230,6 +227,11 @@ enum MoveResult { private long lastTimeToBeReadyInMillis = 0; private final Clock clock; + /** + * Replication progress related metrics. + */ + private ReplicationManagerMetrics metrics; + /** * Constructs ReplicationManager instance with the given configuration. * @@ -265,6 +267,7 @@ public ReplicationManager(final ConfigurationSource conf, HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT, TimeUnit.MILLISECONDS); + this.metrics = null; // register ReplicationManager to SCMServiceManager. serviceManager.register(this); @@ -279,10 +282,7 @@ public ReplicationManager(final ConfigurationSource conf, @Override public synchronized void start() { if (!isRunning()) { - DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME, - "SCM Replication manager (closed container replication) related " - + "metrics", - this); + metrics = ReplicationManagerMetrics.create(this); LOG.info("Starting Replication Monitor Thread."); running = true; replicationMonitor = new Thread(this::run); @@ -321,7 +321,7 @@ public synchronized void stop() { inflightMove.clear(); inflightMoveFuture.clear(); running = false; - DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME); + metrics.unRegister(); notifyAll(); } else { LOG.info("Replication Monitor Thread is not running."); @@ -420,12 +420,20 @@ private void processContainer(ContainerInfo container) { */ updateInflightAction(container, inflightReplication, action -> replicas.stream() - .anyMatch(r -> r.getDatanodeDetails().equals(action.datanode))); + .anyMatch(r -> r.getDatanodeDetails().equals(action.datanode)), + ()-> metrics.incrNumReplicationCmdsTimeout(), + () -> { + metrics.incrNumReplicationCmdsCompleted(); + metrics.incrNumReplicationBytesCompleted( + container.getUsedBytes()); + }); updateInflightAction(container, inflightDeletion, action -> replicas.stream() .noneMatch(r -> - r.getDatanodeDetails().equals(action.datanode))); + r.getDatanodeDetails().equals(action.datanode)), + () -> metrics.incrNumDeletionCmdsTimeout(), + () -> metrics.incrNumDeletionCmdsCompleted()); /* * If container is under deleting and all it's replicas are deleted, @@ -507,10 +515,14 @@ private void processContainer(ContainerInfo container) { * @param container Container to update * @param inflightActions inflightReplication (or) inflightDeletion * @param filter filter to check if the operation is completed + * @param timeoutCounter update timeout metrics + * @param completedCounter update completed metrics */ private void updateInflightAction(final ContainerInfo container, final Map> inflightActions, - final Predicate filter) { + final Predicate filter, + final Runnable timeoutCounter, + final Runnable completedCounter) { final ContainerID id = container.containerID(); final long deadline = clock.millis() - rmConf.getEventTimeout(); if (inflightActions.containsKey(id)) { @@ -528,6 +540,13 @@ private void updateInflightAction(final ContainerInfo container, NodeOperationalState.IN_SERVICE; if (isCompleted || isUnhealthy || isTimeout || isNotInService) { iter.remove(); + + if (isTimeout) { + timeoutCounter.run(); + } else if (isCompleted) { + completedCounter.run(); + } + updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout, isNotInService, container, a.datanode, inflightActions); } @@ -1439,6 +1458,9 @@ private void sendReplicateCommand(final ContainerInfo container, inflightReplication.computeIfAbsent(id, k -> new ArrayList<>()); sendAndTrackDatanodeCommand(datanode, replicateCommand, action -> inflightReplication.get(id).add(action)); + + metrics.incrNumReplicationCmdsSent(); + metrics.incrNumReplicationBytesTotal(container.getUsedBytes()); } /** @@ -1462,6 +1484,8 @@ private void sendDeleteCommand(final ContainerInfo container, inflightDeletion.computeIfAbsent(id, k -> new ArrayList<>()); sendAndTrackDatanodeCommand(datanode, deleteCommand, action -> inflightDeletion.get(id).add(action)); + + metrics.incrNumDeletionCmdsSent(); } /** @@ -1547,22 +1571,10 @@ private boolean isOpenContainerHealthy( .allMatch(r -> ReplicationManager.compareState(state, r.getState())); } - @Override - public void getMetrics(MetricsCollector collector, boolean all) { - collector.addRecord(ReplicationManager.class.getSimpleName()) - .addGauge(ReplicationManagerMetrics.INFLIGHT_REPLICATION, - inflightReplication.size()) - .addGauge(ReplicationManagerMetrics.INFLIGHT_DELETION, - inflightDeletion.size()) - .addGauge(ReplicationManagerMetrics.INFLIGHT_MOVE, - inflightMove.size()) - .endRecord(); - } - /** * Wrapper class to hold the InflightAction with its start time. */ - private static final class InflightAction { + static final class InflightAction { private final DatanodeDetails datanode; private final long time; @@ -1572,6 +1584,11 @@ private InflightAction(final DatanodeDetails datanode, this.datanode = datanode; this.time = time; } + + @VisibleForTesting + public DatanodeDetails getDatanode() { + return datanode; + } } /** @@ -1645,35 +1662,6 @@ public int getMaintenanceReplicaMinimum() { } } - /** - * Metric name definitions for Replication manager. - */ - public enum ReplicationManagerMetrics implements MetricsInfo { - - INFLIGHT_REPLICATION("Tracked inflight container replication requests."), - INFLIGHT_DELETION("Tracked inflight container deletion requests."), - INFLIGHT_MOVE("Tracked inflight container move requests."); - - private final String desc; - - ReplicationManagerMetrics(String desc) { - this.desc = desc; - } - - @Override - public String description() { - return desc; - } - - @Override - public String toString() { - return new StringJoiner(", ", this.getClass().getSimpleName() + "{", "}") - .add("name=" + name()) - .add("description=" + desc) - .toString(); - } - } - @Override public void notifyStatusChanged() { serviceLock.lock(); @@ -1711,4 +1699,21 @@ public boolean shouldRun() { public String getServiceName() { return ReplicationManager.class.getSimpleName(); } + + public ReplicationManagerMetrics getMetrics() { + return this.metrics; + } + + public Map> getInflightReplication() { + return inflightReplication; + } + + public Map> getInflightDeletion() { + return inflightDeletion; + } + + public Map> getInflightMove() { + return inflightMove; + } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java new file mode 100644 index 000000000000..69b1462d8642 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import org.apache.hadoop.hdds.scm.container.ReplicationManager; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.ozone.OzoneConsts; + +/** + * Class contains metrics related to ReplicationManager. + */ +@Metrics(about = "Replication Manager Metrics", context = OzoneConsts.OZONE) +public final class ReplicationManagerMetrics { + + public static final String METRICS_SOURCE_NAME = + ReplicationManagerMetrics.class.getSimpleName(); + + @Metric("Tracked inflight container replication requests.") + private MutableGaugeLong inflightReplication; + + @Metric("Tracked inflight container deletion requests.") + private MutableGaugeLong inflightDeletion; + + @Metric("Tracked inflight container move requests.") + private MutableGaugeLong inflightMove; + + @Metric("Number of replication commands sent.") + private MutableCounterLong numReplicationCmdsSent; + + @Metric("Number of replication commands completed.") + private MutableCounterLong numReplicationCmdsCompleted; + + @Metric("Number of replication commands timeout.") + private MutableCounterLong numReplicationCmdsTimeout; + + @Metric("Number of deletion commands sent.") + private MutableCounterLong numDeletionCmdsSent; + + @Metric("Number of deletion commands completed.") + private MutableCounterLong numDeletionCmdsCompleted; + + @Metric("Number of deletion commands timeout.") + private MutableCounterLong numDeletionCmdsTimeout; + + @Metric("Number of replication bytes total.") + private MutableCounterLong numReplicationBytesTotal; + + @Metric("Number of replication bytes completed.") + private MutableCounterLong numReplicationBytesCompleted; + + private ReplicationManager replicationManager; + + public ReplicationManagerMetrics(ReplicationManager manager) { + this.replicationManager = manager; + } + + public static ReplicationManagerMetrics create(ReplicationManager manager) { + return DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME, + "SCM Replication manager (closed container replication) related " + + "metrics", + new ReplicationManagerMetrics(manager)); + } + + public void unRegister() { + DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME); + } + + public void incrNumReplicationCmdsSent() { + this.numReplicationCmdsSent.incr(); + } + + public void incrNumReplicationCmdsCompleted() { + this.numReplicationCmdsCompleted.incr(); + } + + public void incrNumReplicationCmdsTimeout() { + this.numReplicationCmdsTimeout.incr(); + } + + public void incrNumDeletionCmdsSent() { + this.numDeletionCmdsSent.incr(); + } + + public void incrNumDeletionCmdsCompleted() { + this.numDeletionCmdsCompleted.incr(); + } + + public void incrNumDeletionCmdsTimeout() { + this.numDeletionCmdsTimeout.incr(); + } + + public void incrNumReplicationBytesTotal(long bytes) { + this.numReplicationBytesTotal.incr(bytes); + } + + public void incrNumReplicationBytesCompleted(long bytes) { + this.numReplicationBytesCompleted.incr(bytes); + } + + public long getInflightReplication() { + return replicationManager.getInflightReplication().size(); + } + + public long getInflightDeletion() { + return replicationManager.getInflightDeletion().size(); + } + + public long getInflightMove() { + return replicationManager.getInflightMove().size(); + } + + public long getNumReplicationCmdsSent() { + return this.numReplicationCmdsSent.value(); + } + + public long getNumReplicationCmdsCompleted() { + return this.numReplicationCmdsCompleted.value(); + } + + public long getNumReplicationCmdsTimeout() { + return this.numReplicationCmdsTimeout.value(); + } + + public long getNumDeletionCmdsSent() { + return this.numDeletionCmdsSent.value(); + } + + public long getNumDeletionCmdsCompleted() { + return this.numDeletionCmdsCompleted.value(); + } + + public long getNumDeletionCmdsTimeout() { + return this.numDeletionCmdsTimeout.value(); + } + + public long getNumReplicationBytesTotal() { + return this.numReplicationBytesTotal.value(); + } + + public long getNumReplicationBytesCompleted() { + return this.numReplicationBytesCompleted.value(); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index 130425886618..d7fcde778146 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -215,7 +215,6 @@ public void testOpenContainer() throws SCMException, InterruptedException { final ContainerInfo container = getContainer(LifeCycleState.OPEN); containerStateManager.loadContainer(container); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); @@ -250,7 +249,6 @@ public void testClosingContainer() throws .getInvocationCount(SCMCommandProto.Type.closeContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.closeContainerCommand)); @@ -261,7 +259,6 @@ public void testClosingContainer() throws } replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.closeContainerCommand)); @@ -296,7 +293,6 @@ public void testQuasiClosedContainerWithTwoOpenReplica() throws .getInvocationCount(SCMCommandProto.Type.closeContainerCommand); // Two of the replicas are in OPEN state replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentCloseCommandCount + 2, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.closeContainerCommand)); @@ -334,7 +330,6 @@ public void testHealthyQuasiClosedContainer() throws // All the QUASI_CLOSED replicas have same originNodeId, so the // container will not be closed. ReplicationManager should take no action. replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); } @@ -353,6 +348,7 @@ public void testQuasiClosedContainerWithUnhealthyReplica() throws SCMException, ContainerNotFoundException, InterruptedException, ContainerReplicaNotFoundException { final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + container.setUsedBytes(100); final ContainerID id = container.containerID(); final UUID originNodeId = UUID.randomUUID(); final ContainerReplica replicaOne = getReplicas( @@ -375,7 +371,6 @@ public void testQuasiClosedContainerWithUnhealthyReplica() // All the QUASI_CLOSED replicas have same originNodeId, so the // container will not be closed. ReplicationManager should take no action. replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); @@ -386,26 +381,59 @@ public void testQuasiClosedContainerWithUnhealthyReplica() containerStateManager.updateContainerReplica(id, unhealthyReplica); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); Assert.assertTrue(datanodeCommandHandler.received( SCMCommandProto.Type.deleteContainerCommand, replicaOne.getDatanodeDetails())); + Assert.assertEquals(currentDeleteCommandCount + 1, + replicationManager.getMetrics().getNumDeletionCmdsSent()); // Now we will delete the unhealthy replica from in-memory. containerStateManager.removeContainerReplica(id, replicaOne); + final long currentBytesToReplicate = replicationManager.getMetrics() + .getNumReplicationBytesTotal(); + // The container is under replicated as unhealthy replica is removed replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); // We should get replicate command Assert.assertEquals(currentReplicateCommandCount + 1, datanodeCommandHandler.getInvocationCount( SCMCommandProto.Type.replicateContainerCommand)); + Assert.assertEquals(currentReplicateCommandCount + 1, + replicationManager.getMetrics().getNumReplicationCmdsSent()); + Assert.assertEquals(currentBytesToReplicate + 100L, + replicationManager.getMetrics().getNumReplicationBytesTotal()); + Assert.assertEquals(1, replicationManager.getInflightReplication().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightReplication()); + + // Now we add the missing replica back + DatanodeDetails targetDn = replicationManager.getInflightReplication() + .get(id).get(0).getDatanode(); + final ContainerReplica replicatedReplicaOne = getReplicas( + id, State.CLOSED, 1000L, originNodeId, targetDn); + containerStateManager.updateContainerReplica(id, replicatedReplicaOne); + + final long currentReplicationCommandCompleted = replicationManager + .getMetrics().getNumReplicationCmdsCompleted(); + final long currentBytesCompleted = replicationManager.getMetrics() + .getNumReplicationBytesCompleted(); + + replicationManager.processAll(); + eventQueue.processAll(1000); + + Assert.assertEquals(0, replicationManager.getInflightReplication().size()); + Assert.assertEquals(0, replicationManager.getMetrics() + .getInflightReplication()); + Assert.assertEquals(currentReplicationCommandCompleted + 1, + replicationManager.getMetrics().getNumReplicationCmdsCompleted()); + Assert.assertEquals(currentBytesCompleted + 100L, + replicationManager.getMetrics().getNumReplicationBytesCompleted()); } /** @@ -437,10 +465,38 @@ public void testOverReplicatedQuasiClosedContainer() throws .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + Assert.assertEquals(currentDeleteCommandCount + 1, + replicationManager.getMetrics().getNumDeletionCmdsSent()); + Assert.assertEquals(1, replicationManager.getInflightDeletion().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightDeletion()); + + // Now we remove the replica according to inflight + DatanodeDetails targetDn = replicationManager.getInflightDeletion() + .get(id).get(0).getDatanode(); + if (targetDn.equals(replicaOne.getDatanodeDetails())) { + containerStateManager.removeContainerReplica(id, replicaOne); + } else if (targetDn.equals(replicaTwo.getDatanodeDetails())) { + containerStateManager.removeContainerReplica(id, replicaTwo); + } else if (targetDn.equals(replicaThree.getDatanodeDetails())) { + containerStateManager.removeContainerReplica(id, replicaThree); + } else if (targetDn.equals(replicaFour.getDatanodeDetails())) { + containerStateManager.removeContainerReplica(id, replicaFour); + } + + final long currentDeleteCommandCompleted = replicationManager.getMetrics() + .getNumDeletionCmdsCompleted(); + + replicationManager.processAll(); + eventQueue.processAll(1000); + Assert.assertEquals(0, replicationManager.getInflightDeletion().size()); + Assert.assertEquals(0, replicationManager.getMetrics() + .getInflightDeletion()); + Assert.assertEquals(currentDeleteCommandCompleted + 1, + replicationManager.getMetrics().getNumDeletionCmdsCompleted()); } /** @@ -474,13 +530,31 @@ public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica() .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); Assert.assertTrue(datanodeCommandHandler.received( SCMCommandProto.Type.deleteContainerCommand, replicaOne.getDatanodeDetails())); + Assert.assertEquals(currentDeleteCommandCount + 1, + replicationManager.getMetrics().getNumDeletionCmdsSent()); + Assert.assertEquals(1, replicationManager.getInflightDeletion().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightDeletion()); + + final long currentDeleteCommandCompleted = replicationManager.getMetrics() + .getNumDeletionCmdsCompleted(); + // Now we remove the replica to simulate deletion complete + containerStateManager.removeContainerReplica(id, replicaOne); + + replicationManager.processAll(); + eventQueue.processAll(1000); + + Assert.assertEquals(currentDeleteCommandCompleted + 1, + replicationManager.getMetrics().getNumDeletionCmdsCompleted()); + Assert.assertEquals(0, replicationManager.getInflightDeletion().size()); + Assert.assertEquals(0, replicationManager.getMetrics() + .getInflightDeletion()); } /** @@ -491,6 +565,7 @@ public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica() public void testUnderReplicatedQuasiClosedContainer() throws SCMException, ContainerNotFoundException, InterruptedException { final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + container.setUsedBytes(100); final ContainerID id = container.containerID(); final UUID originNodeId = UUID.randomUUID(); final ContainerReplica replicaOne = getReplicas( @@ -504,13 +579,44 @@ public void testUnderReplicatedQuasiClosedContainer() throws final int currentReplicateCommandCount = datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand); + final long currentBytesToReplicate = replicationManager.getMetrics() + .getNumReplicationBytesTotal(); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentReplicateCommandCount + 1, datanodeCommandHandler.getInvocationCount( SCMCommandProto.Type.replicateContainerCommand)); + Assert.assertEquals(currentReplicateCommandCount + 1, + replicationManager.getMetrics().getNumReplicationCmdsSent()); + Assert.assertEquals(currentBytesToReplicate + 100, + replicationManager.getMetrics().getNumReplicationBytesTotal()); + Assert.assertEquals(1, replicationManager.getInflightReplication().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightReplication()); + + final long currentReplicateCommandCompleted = replicationManager + .getMetrics().getNumReplicationCmdsCompleted(); + final long currentReplicateBytesCompleted = replicationManager + .getMetrics().getNumReplicationBytesCompleted(); + + // Now we add the replicated new replica + DatanodeDetails targetDn = replicationManager.getInflightReplication() + .get(id).get(0).getDatanode(); + final ContainerReplica replicatedReplicaThree = getReplicas( + id, State.CLOSED, 1000L, originNodeId, targetDn); + containerStateManager.updateContainerReplica(id, replicatedReplicaThree); + + replicationManager.processAll(); + eventQueue.processAll(1000); + + Assert.assertEquals(currentReplicateCommandCompleted + 1, + replicationManager.getMetrics().getNumReplicationCmdsCompleted()); + Assert.assertEquals(currentReplicateBytesCompleted + 100, + replicationManager.getMetrics().getNumReplicationBytesCompleted()); + Assert.assertEquals(0, replicationManager.getInflightReplication().size()); + Assert.assertEquals(0, replicationManager.getMetrics() + .getInflightReplication()); } /** @@ -554,7 +660,6 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica() .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler GenericTestUtils.waitFor( () -> (currentReplicateCommandCount + 1) == datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand), @@ -580,7 +685,6 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica() */ replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); @@ -588,9 +692,16 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica() Assert.assertTrue(datanodeCommandHandler.received( SCMCommandProto.Type.deleteContainerCommand, replicaTwo.getDatanodeDetails())); + Assert.assertEquals(currentDeleteCommandCount + 1, + replicationManager.getMetrics().getNumDeletionCmdsSent()); + Assert.assertEquals(1, replicationManager.getInflightDeletion().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightDeletion()); containerStateManager.removeContainerReplica(id, replicaTwo); + final long currentDeleteCommandCompleted = replicationManager.getMetrics() + .getNumDeletionCmdsCompleted(); /* * We have now removed unhealthy replica, next iteration of * ReplicationManager should re-replicate the container as it @@ -598,11 +709,22 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica() */ replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); + + Assert.assertEquals(0, replicationManager.getInflightDeletion().size()); + Assert.assertEquals(0, replicationManager.getMetrics() + .getInflightDeletion()); + Assert.assertEquals(currentDeleteCommandCompleted + 1, + replicationManager.getMetrics().getNumDeletionCmdsCompleted()); + Assert.assertEquals(currentReplicateCommandCount + 2, datanodeCommandHandler.getInvocationCount( SCMCommandProto.Type.replicateContainerCommand)); + Assert.assertEquals(currentReplicateCommandCount + 2, + replicationManager.getMetrics().getNumReplicationCmdsSent()); + Assert.assertEquals(1, replicationManager.getInflightReplication().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightReplication()); } @@ -630,7 +752,6 @@ public void testQuasiClosedToClosed() throws .getInvocationCount(SCMCommandProto.Type.closeContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); // All the replicas have same BCSID, so all of them will be closed. @@ -660,7 +781,6 @@ public void testHealthyClosedContainer() } replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); } @@ -688,7 +808,6 @@ public void testUnhealthyOpenContainer() eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Mockito.verify(closeContainerHandler, Mockito.times(1)) .onMessage(id, eventQueue); @@ -710,6 +829,7 @@ public void testGeneratedConfig() { public void additionalReplicaScheduledWhenMisReplicated() throws SCMException, ContainerNotFoundException, InterruptedException { final ContainerInfo container = getContainer(LifeCycleState.CLOSED); + container.setUsedBytes(100); final ContainerID id = container.containerID(); final UUID originNodeId = UUID.randomUUID(); final ContainerReplica replicaOne = getReplicas( @@ -736,15 +856,23 @@ public void additionalReplicaScheduledWhenMisReplicated() int currentReplicateCommandCount = datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand); + final long currentBytesToReplicate = replicationManager.getMetrics() + .getNumReplicationBytesTotal(); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); - // At this stage, due to the mocked calls to validteContainerPlacement - // the mis-replicated racks will not have improved, so expect to see nothing - // scheduled. + // At this stage, due to the mocked calls to validateContainerPlacement + // the policy will not be satisfied, and replication will be triggered. + Assert.assertEquals(currentReplicateCommandCount + 1, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand)); + Assert.assertEquals(currentReplicateCommandCount + 1, + replicationManager.getMetrics().getNumReplicationCmdsSent()); + Assert.assertEquals(currentBytesToReplicate + 100, + replicationManager.getMetrics().getNumReplicationBytesTotal()); + Assert.assertEquals(1, replicationManager.getInflightReplication().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightReplication()); // Now make it so that all containers seem mis-replicated no matter how // many replicas. This will test replicas are not scheduled if the new @@ -760,13 +888,17 @@ public void additionalReplicaScheduledWhenMisReplicated() .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); - // At this stage, due to the mocked calls to validteContainerPlacement + // At this stage, due to the mocked calls to validateContainerPlacement // the mis-replicated racks will not have improved, so expect to see nothing // scheduled. Assert.assertEquals(currentReplicateCommandCount, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand)); + Assert.assertEquals(currentReplicateCommandCount, + replicationManager.getMetrics().getNumReplicationCmdsSent()); + Assert.assertEquals(1, replicationManager.getInflightReplication().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightReplication()); } @Test @@ -806,17 +938,21 @@ public void overReplicatedButRemovingMakesMisReplicated() .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); // The unhealthy replica should be removed, but not the other replica - // as each time we test with 3 replicas, Mockitor ensures it returns + // as each time we test with 3 replicas, Mockito ensures it returns // mis-replicated Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + Assert.assertEquals(currentDeleteCommandCount + 1, + replicationManager.getMetrics().getNumDeletionCmdsSent()); Assert.assertTrue(datanodeCommandHandler.received( SCMCommandProto.Type.deleteContainerCommand, replicaFive.getDatanodeDetails())); + Assert.assertEquals(1, replicationManager.getInflightDeletion().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightDeletion()); } @Test @@ -850,10 +986,14 @@ public void testOverReplicatedAndPolicySatisfied() throws .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + Assert.assertEquals(currentDeleteCommandCount + 1, + replicationManager.getMetrics().getNumDeletionCmdsSent()); + Assert.assertEquals(1, replicationManager.getInflightDeletion().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightDeletion()); } @Test @@ -890,10 +1030,14 @@ public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + Assert.assertEquals(currentDeleteCommandCount + 2, + replicationManager.getMetrics().getNumDeletionCmdsSent()); + Assert.assertEquals(1, replicationManager.getInflightDeletion().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightDeletion()); } /** @@ -1057,10 +1201,14 @@ public void testOverReplicatedClosedContainerWithDecomAndMaint() .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentDeleteCommandCount + 2, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + Assert.assertEquals(currentDeleteCommandCount + 2, + replicationManager.getMetrics().getNumDeletionCmdsSent()); + Assert.assertEquals(1, replicationManager.getInflightDeletion().size()); + Assert.assertEquals(1, replicationManager.getMetrics() + .getInflightDeletion()); // Get the DECOM and Maint replica and ensure none of them are scheduled // for removal Set decom = @@ -1341,6 +1489,31 @@ public void testReplicateCommandTimeout() throws // scheduled clock.fastForward(timeout + 1000); assertReplicaScheduled(1); + Assert.assertEquals(1, replicationManager.getMetrics() + .getNumReplicationCmdsTimeout()); + } + + @Test + public void testDeleteCommandTimeout() throws + SCMException, InterruptedException { + long timeout = new ReplicationManagerConfiguration().getEventTimeout(); + + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + assertDeleteScheduled(1); + + // Already a pending replica, so nothing scheduled + assertReplicaScheduled(0); + + // Advance the clock past the timeout, and there should be a replica + // scheduled + clock.fastForward(timeout + 1000); + assertDeleteScheduled(1); + Assert.assertEquals(1, replicationManager.getMetrics() + .getNumDeletionCmdsTimeout()); } private ContainerInfo createContainer(LifeCycleState containerState) @@ -1393,11 +1566,25 @@ private void assertReplicaScheduled(int delta) throws InterruptedException { .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand); replicationManager.processAll(); - // Wait for EventQueue to call the event handler eventQueue.processAll(1000); Assert.assertEquals(currentReplicateCommandCount + delta, datanodeCommandHandler.getInvocationCount( SCMCommandProto.Type.replicateContainerCommand)); + Assert.assertEquals(currentReplicateCommandCount + delta, + replicationManager.getMetrics().getNumReplicationCmdsSent()); + } + + private void assertDeleteScheduled(int delta) throws InterruptedException { + final int currentDeleteCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); + + replicationManager.processAll(); + eventQueue.processAll(1000); + Assert.assertEquals(currentDeleteCommandCount + delta, + datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.deleteContainerCommand)); + Assert.assertEquals(currentDeleteCommandCount + delta, + replicationManager.getMetrics().getNumDeletionCmdsSent()); } @After