diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java index 26368b46e463..590d1f12e47a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java @@ -269,4 +269,14 @@ public boolean isHealthy() { .allMatch(r -> ReplicationManager.compareState( container.getState(), r.getState())); } + + /** + * Returns true is there are no replicas of a container available, ie the + * set of container replica passed in the constructor has zero entries. + * + * @return true if there are no replicas, false otherwise. + */ + public boolean isMissing() { + return replica.size() == 0; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 47842c0ad594..224b1f9b8877 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -87,6 +87,7 @@ import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.hdds.utils.db.Table; import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION; +import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState; import com.google.protobuf.GeneratedMessage; import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; @@ -254,6 +255,11 @@ public enum MoveResult { */ private final MoveScheduler moveScheduler; + /** + * Report object that is refreshed each time replication Manager runs. + */ + private ReplicationManagerReport containerReport; + /** * Constructs ReplicationManager instance with the given configuration. * @@ -286,6 +292,7 @@ public ReplicationManager(final ConfigurationSource conf, this.inflightMoveFuture = new ConcurrentHashMap<>(); this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum(); this.clock = clock; + this.containerReport = new ReplicationManagerReport(); this.waitTimeInMillis = conf.getTimeDuration( HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, @@ -366,13 +373,21 @@ public synchronized void processAll() { final long start = clock.millis(); final List containers = containerManager.getContainers(); - containers.forEach(this::processContainer); - + ReplicationManagerReport report = new ReplicationManagerReport(); + for (ContainerInfo c : containers) { + processContainer(c, report); + } + report.setComplete(); + containerReport = report; LOG.info("Replication Monitor Thread took {} milliseconds for" + " processing {} containers.", clock.millis() - start, containers.size()); } + public ReplicationManagerReport getContainerReport() { + return containerReport; + } + /** * ReplicationMonitor thread runnable. This wakes up at configured * interval and processes all the containers in the system. @@ -398,7 +413,9 @@ private synchronized void run() { * * @param container ContainerInfo */ - private void processContainer(ContainerInfo container) { + @SuppressWarnings("checkstyle:methodlength") + private void processContainer(ContainerInfo container, + ReplicationManagerReport report) { if (!shouldRun()) { return; } @@ -410,6 +427,7 @@ private void processContainer(ContainerInfo container) { final Set replicas = containerManager .getContainerReplicas(id); final LifeCycleState state = container.getState(); + report.increment(state); /* * We don't take any action if the container is in OPEN state and @@ -418,6 +436,8 @@ private void processContainer(ContainerInfo container) { */ if (state == LifeCycleState.OPEN) { if (!isOpenContainerHealthy(container, replicas)) { + report.incrementAndSample( + HealthState.OPEN_UNHEALTHY, container.containerID()); eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id); } return; @@ -442,10 +462,14 @@ private void processContainer(ContainerInfo container) { * If the container is in QUASI_CLOSED state, check and close the * container if possible. */ - if (state == LifeCycleState.QUASI_CLOSED && - canForceCloseContainer(container, replicas)) { - forceCloseContainer(container, replicas); - return; + if (state == LifeCycleState.QUASI_CLOSED) { + if (canForceCloseContainer(container, replicas)) { + forceCloseContainer(container, replicas); + return; + } else { + report.incrementAndSample(HealthState.QUASI_CLOSED_STUCK, + container.containerID()); + } } /* @@ -498,6 +522,8 @@ private void processContainer(ContainerInfo container) { * exact number of replicas in the same state. */ if (isContainerEmpty(container, replicas)) { + report.incrementAndSample( + HealthState.EMPTY, container.containerID()); /* * If container is empty, schedule task to delete the container. */ @@ -509,8 +535,22 @@ private void processContainer(ContainerInfo container) { * Check if the container is under replicated and take appropriate * action. */ - if (!replicaSet.isSufficientlyReplicated() - || !placementStatus.isPolicySatisfied()) { + boolean sufficientlyReplicated = replicaSet.isSufficientlyReplicated(); + boolean placementSatisfied = placementStatus.isPolicySatisfied(); + if (!sufficientlyReplicated || !placementSatisfied) { + if (!sufficientlyReplicated) { + report.incrementAndSample( + HealthState.UNDER_REPLICATED, container.containerID()); + if (replicaSet.isMissing()) { + report.incrementAndSample(HealthState.MISSING, + container.containerID()); + } + } + if (!placementSatisfied) { + report.incrementAndSample(HealthState.MIS_REPLICATED, + container.containerID()); + + } handleUnderReplicatedContainer(container, replicaSet, placementStatus); return; @@ -521,6 +561,8 @@ private void processContainer(ContainerInfo container) { * action. */ if (replicaSet.isOverReplicated()) { + report.incrementAndSample(HealthState.OVER_REPLICATED, + container.containerID()); handleOverReplicatedContainer(container, replicaSet); return; } @@ -531,6 +573,8 @@ private void processContainer(ContainerInfo container) { are not in the same state as the container itself. */ if (!replicaSet.isHealthy()) { + report.incrementAndSample(HealthState.UNHEALTHY, + container.containerID()); handleUnstableContainer(container, replicas); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java new file mode 100644 index 000000000000..21bde49cec1c --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManagerReport.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; + +/** + * This class is used by ReplicationManager. Each time ReplicationManager runs, + * it creates a new instance of this class and increments the various counters + * to allow for creating a report on the various container states within the + * system. There is a counter for each LifeCycleState (open, closing, closed + * etc) and the sum of each of the lifecycle state counters should equal the + * total number of containers in SCM. Ie, each container can only be in one of + * the Lifecycle states at any time. + * + * Additionally, there are a set of counters for the "health state" of the + * containers, defined here in the HealthState enum. It is normal for containers + * to be in these health states from time to time, but the presence of a + * container in one of these health states generally means cluster is in a + * degraded state. Normally, the cluster will recover by itself, but manual + * intervention may be needed in some cases. + * + * To aid debugging, when containers are in one of the health states, a list of + * up to SAMPLE_LIMIT container IDs are recorded in the report for each of the + * states. + */ +public class ReplicationManagerReport { + + public static final int SAMPLE_LIMIT = 100; + private long reportTimeStamp; + + /** + * Enum representing various health states a container can be in. + */ + public enum HealthState { + UNDER_REPLICATED("Containers with insufficient replicas", + "NumUnderReplicatedContainers"), + MIS_REPLICATED("Containers with insufficient racks", + "NumMisReplicatedContainers"), + OVER_REPLICATED("Containers with more replicas than required", + "NumOverReplicatedContainers"), + MISSING("Containers with no online replicas", + "NumMissingContainers"), + UNHEALTHY( + "Containers Closed or Quasi_Closed having some replicas in " + + "a different state", "NumUnhealthyContainers"), + EMPTY("Containers having no blocks", "NumEmptyContainers"), + OPEN_UNHEALTHY( + "Containers open and having replicas with different states", + "NumOpenUnhealthyContainers"), + QUASI_CLOSED_STUCK( + "Containers QuasiClosed with insufficient datanode origins", + "NumStuckQuasiClosedContainers"); + + private String description; + private String metricName; + + HealthState(String desc, String name) { + this.description = desc; + this.metricName = name; + } + + public String getMetricName() { + return this.metricName; + } + + public String getDescription() { + return this.description; + } + } + + private final Map stats; + private final Map> containerSample + = new ConcurrentHashMap<>(); + + public ReplicationManagerReport() { + stats = createStatsMap(); + } + + public void increment(HealthState stat) { + increment(stat.toString()); + } + + public void increment(HddsProtos.LifeCycleState stat) { + increment(stat.toString()); + } + + public void incrementAndSample(HealthState stat, ContainerID container) { + incrementAndSample(stat.toString(), container); + } + + public void incrementAndSample(HddsProtos.LifeCycleState stat, + ContainerID container) { + incrementAndSample(stat.toString(), container); + } + + public void setComplete() { + reportTimeStamp = System.currentTimeMillis(); + } + + /** + * The epoch time in milli-seconds when this report was completed. + * @return epoch time in milli-seconds. + */ + public long getReportTimeStamp() { + return reportTimeStamp; + } + + public long getStat(HddsProtos.LifeCycleState stat) { + return getStat(stat.toString()); + } + + public long getStat(HealthState stat) { + return getStat(stat.toString()); + } + + private long getStat(String stat) { + return stats.get(stat).longValue(); + } + + public List getSample(HddsProtos.LifeCycleState stat) { + return getSample(stat.toString()); + } + + public List getSample(HealthState stat) { + return getSample(stat.toString()); + } + + private List getSample(String stat) { + List list = containerSample.get(stat); + if (list == null) { + return Collections.emptyList(); + } + synchronized (list) { + return new ArrayList<>(list); + } + } + + private void increment(String stat) { + LongAdder adder = stats.get(stat); + if (adder == null) { + throw new IllegalArgumentException("Unexpected stat " + stat); + } + adder.increment(); + } + + private void incrementAndSample(String stat, ContainerID container) { + increment(stat); + List list = containerSample + .computeIfAbsent(stat, k -> new ArrayList<>()); + synchronized(list) { + if (list.size() < SAMPLE_LIMIT) { + list.add(container); + } + } + } + + private Map createStatsMap() { + Map map = new HashMap<>(); + for (HddsProtos.LifeCycleState s : HddsProtos.LifeCycleState.values()) { + map.put(s.toString(), new LongAdder()); + } + for (HealthState s : HealthState.values()) { + map.put(s.toString(), new LongAdder()); + } + return map; + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java index 9a70f6974b8c..0f828aef1a68 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java @@ -16,7 +16,10 @@ */ package org.apache.hadoop.hdds.scm.container.replication; +import com.google.common.base.CaseFormat; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -30,6 +33,13 @@ import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.OzoneConsts; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport.HealthState; + /** * Class contains metrics related to ReplicationManager. */ @@ -51,6 +61,28 @@ public final class ReplicationManagerMetrics implements MetricsSource { "InflightMove", "Tracked inflight container move requests."); + // Setup metric names and descriptions for Container Lifecycle states + private static final Map LIFECYCLE_STATE_METRICS + = Collections.unmodifiableMap( + new LinkedHashMap() {{ + for (LifeCycleState s : LifeCycleState.values()) { + String name = CaseFormat.UPPER_UNDERSCORE + .to(CaseFormat.UPPER_CAMEL, s.toString()); + String metric = "Num" + name + "Containers"; + String description = "Containers in " + name + " state"; + put(s, Interns.info(metric, description)); + } + }}); + + // Setup metric names and descriptions for + private static final Map + CONTAINER_HEALTH_STATE_METRICS = Collections.unmodifiableMap( + new LinkedHashMap() {{ + for (HealthState s : HealthState.values()) { + put(s, Interns.info(s.getMetricName(), s.getDescription())); + } + }}); + @Metric("Number of replication commands sent.") private MutableCounterLong numReplicationCmdsSent; @@ -110,6 +142,16 @@ public void getMetrics(MetricsCollector collector, boolean all) { .addGauge(INFLIGHT_DELETION, getInflightDeletion()) .addGauge(INFLIGHT_MOVE, getInflightMove()); + ReplicationManagerReport report = replicationManager.getContainerReport(); + for (Map.Entry e : + LIFECYCLE_STATE_METRICS.entrySet()) { + builder.addGauge(e.getValue(), report.getStat(e.getKey())); + } + for (Map.Entry e : + CONTAINER_HEALTH_STATE_METRICS.entrySet()) { + builder.addGauge(e.getValue(), report.getStat(e.getKey())); + } + numReplicationCmdsSent.snapshot(builder, all); numReplicationCmdsCompleted.snapshot(builder, all); numReplicationCmdsTimeout.snapshot(builder, all); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index ea42292e973d..cfaabd74ad68 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -263,8 +263,9 @@ public void testOpenContainer() throws IOException { containerStateManager.addContainer(container.getProtobuf()); replicationManager.processAll(); eventQueue.processAll(1000); + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.OPEN)); Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); - } /** @@ -308,6 +309,8 @@ public void testClosingContainer() throws IOException { eventQueue.processAll(1000); Assert.assertEquals(currentCloseCommandCount + 6, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.closeContainerCommand)); + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.CLOSING)); } @@ -348,6 +351,8 @@ public void testQuasiClosedContainerWithTwoOpenReplica() throws IOException { Assert.assertTrue(datanodeCommandHandler.received( SCMCommandProto.Type.closeContainerCommand, replicaThree.getDatanodeDetails())); + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); } /** @@ -378,6 +383,10 @@ public void testHealthyQuasiClosedContainer() throws IOException { replicationManager.processAll(); eventQueue.processAll(1000); Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); } /** @@ -459,6 +468,14 @@ public void testQuasiClosedContainerWithUnhealthyReplica() Assert.assertEquals(1, replicationManager.getMetrics() .getInflightReplication()); + // We should have one under replicated and one quasi_closed_stuck + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + // Now we add the missing replica back DatanodeDetails targetDn = replicationManager.getInflightReplication() .get(id).get(0).getDatanode(); @@ -482,6 +499,13 @@ public void testQuasiClosedContainerWithUnhealthyReplica() replicationManager.getMetrics().getNumReplicationCmdsCompleted()); Assert.assertEquals(currentBytesCompleted + 100L, replicationManager.getMetrics().getNumReplicationBytesCompleted()); + + report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(0, report.getStat( + ReplicationManagerReport.HealthState.UNDER_REPLICATED)); } /** @@ -523,6 +547,13 @@ public void testOverReplicatedQuasiClosedContainer() throws IOException { Assert.assertEquals(1, replicationManager.getMetrics() .getInflightDeletion()); + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.OVER_REPLICATED)); + // Now we remove the replica according to inflight DatanodeDetails targetDn = replicationManager.getInflightDeletion() .get(id).get(0).getDatanode(); @@ -554,6 +585,13 @@ public void testOverReplicatedQuasiClosedContainer() throws IOException { replicationManager.getMetrics().getNumDeletionCmdsCompleted()); Assert.assertEquals(deleteBytesCompleted + 101, replicationManager.getMetrics().getNumDeletionBytesCompleted()); + + report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(0, report.getStat( + ReplicationManagerReport.HealthState.OVER_REPLICATED)); } /** @@ -600,6 +638,13 @@ public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica() Assert.assertEquals(1, replicationManager.getMetrics() .getInflightDeletion()); + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.OVER_REPLICATED)); + final long currentDeleteCommandCompleted = replicationManager.getMetrics() .getNumDeletionCmdsCompleted(); // Now we remove the replica to simulate deletion complete @@ -613,6 +658,13 @@ public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica() Assert.assertEquals(0, replicationManager.getInflightDeletion().size()); Assert.assertEquals(0, replicationManager.getMetrics() .getInflightDeletion()); + + report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(0, report.getStat( + ReplicationManagerReport.HealthState.OVER_REPLICATED)); } /** @@ -652,6 +704,13 @@ public void testUnderReplicatedQuasiClosedContainer() throws IOException { Assert.assertEquals(1, replicationManager.getMetrics() .getInflightReplication()); + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + final long currentReplicateCommandCompleted = replicationManager .getMetrics().getNumReplicationCmdsCompleted(); final long currentReplicateBytesCompleted = replicationManager @@ -675,6 +734,13 @@ public void testUnderReplicatedQuasiClosedContainer() throws IOException { Assert.assertEquals(0, replicationManager.getInflightReplication().size()); Assert.assertEquals(0, replicationManager.getMetrics() .getInflightReplication()); + + report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(0, report.getStat( + ReplicationManagerReport.HealthState.UNDER_REPLICATED)); } /** @@ -740,6 +806,15 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica() id, State.QUASI_CLOSED, 1000L, originNodeId, newNode); containerStateManager.updateContainerReplica(id.getProtobuf(), newReplica); + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + Assert.assertEquals(0, report.getStat( + ReplicationManagerReport.HealthState.UNHEALTHY)); + /* * We have report the replica to SCM, in the next ReplicationManager * iteration it should delete the unhealthy replica. @@ -765,6 +840,15 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica() final long currentDeleteCommandCompleted = replicationManager.getMetrics() .getNumDeletionCmdsCompleted(); + + report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(0, report.getStat( + ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.UNHEALTHY)); /* * We have now removed unhealthy replica, next iteration of * ReplicationManager should re-replicate the container as it @@ -788,6 +872,15 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica() Assert.assertEquals(1, replicationManager.getInflightReplication().size()); Assert.assertEquals(1, replicationManager.getMetrics() .getInflightReplication()); + + report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + Assert.assertEquals(0, report.getStat( + ReplicationManagerReport.HealthState.UNHEALTHY)); } @@ -820,6 +913,10 @@ public void testQuasiClosedToClosed() throws IOException { Assert.assertEquals(currentCloseCommandCount + 3, datanodeCommandHandler .getInvocationCount(SCMCommandProto.Type.closeContainerCommand)); + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.QUASI_CLOSED)); + Assert.assertEquals(0, report.getStat( + ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK)); } @@ -844,6 +941,13 @@ public void testHealthyClosedContainer() throws IOException { replicationManager.processAll(); eventQueue.processAll(1000); Assert.assertEquals(0, datanodeCommandHandler.getInvocation()); + + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.CLOSED)); + for (ReplicationManagerReport.HealthState s : + ReplicationManagerReport.HealthState.values()) { + Assert.assertEquals(0, report.getStat(s)); + } } /** @@ -871,6 +975,11 @@ public void testUnhealthyOpenContainer() throws IOException { eventQueue.processAll(1000); Mockito.verify(closeContainerHandler, Mockito.times(1)) .onMessage(id, eventQueue); + + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.OPEN)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.OPEN_UNHEALTHY)); } /** @@ -957,6 +1066,11 @@ public void additionalReplicaScheduledWhenMisReplicated() throws IOException { Assert.assertEquals(1, replicationManager.getMetrics() .getInflightReplication()); + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(1, report.getStat(LifeCycleState.CLOSED)); + Assert.assertEquals(1, report.getStat( + ReplicationManagerReport.HealthState.MIS_REPLICATED)); + // Now make it so that all containers seem mis-replicated no matter how // many replicas. This will test replicas are not scheduled if the new // replica does not fix the mis-replication. @@ -1036,6 +1150,7 @@ public void overReplicatedButRemovingMakesMisReplicated() throws IOException { Assert.assertEquals(1, replicationManager.getInflightDeletion().size()); Assert.assertEquals(1, replicationManager.getMetrics() .getInflightDeletion()); + assertOverReplicatedCount(1); } @Test @@ -1077,6 +1192,8 @@ public void testOverReplicatedAndPolicySatisfied() throws IOException { Assert.assertEquals(1, replicationManager.getInflightDeletion().size()); Assert.assertEquals(1, replicationManager.getMetrics() .getInflightDeletion()); + + assertOverReplicatedCount(1); } @Test @@ -1135,6 +1252,7 @@ public void testUnderReplicatedDueToDecommission() throws IOException { addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED); addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED); assertReplicaScheduled(2); + assertUnderReplicatedCount(1); } /** @@ -1148,6 +1266,7 @@ public void testUnderReplicatedDueToAllDecommission() throws IOException { addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED); addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED); assertReplicaScheduled(3); + assertUnderReplicatedCount(1); } /** @@ -1162,6 +1281,7 @@ public void testCorrectlyReplicatedWithDecommission() throws IOException { addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED); assertReplicaScheduled(0); + assertUnderReplicatedCount(0); } /** @@ -1175,6 +1295,7 @@ public void testUnderReplicatedDueToMaintenance() throws IOException { addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); assertReplicaScheduled(1); + assertUnderReplicatedCount(1); } /** @@ -1195,6 +1316,7 @@ public void testNotUnderReplicatedDueToMaintenanceMinRepOne() addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); assertReplicaScheduled(0); + assertUnderReplicatedCount(0); } /** @@ -1215,6 +1337,7 @@ public void testUnderReplicatedDueToMaintenanceMinRepOne() addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); assertReplicaScheduled(1); + assertUnderReplicatedCount(1); } /** @@ -1228,6 +1351,7 @@ public void testUnderReplicatedDueToAllMaintenance() throws IOException { addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); assertReplicaScheduled(2); + assertUnderReplicatedCount(1); } /** @@ -1242,6 +1366,7 @@ public void testCorrectlyReplicatedWithMaintenance() throws IOException { addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); assertReplicaScheduled(0); + assertUnderReplicatedCount(0); } /** @@ -1257,8 +1382,21 @@ public void testUnderReplicatedWithDecommissionAndMaintenance() addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED); assertReplicaScheduled(2); + assertUnderReplicatedCount(1); } + /** + * ReplicationManager should replicate zero replica when all copies + * are missing. + */ + @Test + public void testContainerWithMissingReplicas() + throws IOException { + createContainer(LifeCycleState.CLOSED); + assertReplicaScheduled(0); + assertUnderReplicatedCount(1); + assertMissingCount(1); + } /** * When a CLOSED container is over replicated, ReplicationManager * deletes the excess replicas. While choosing the replica for deletion @@ -1302,6 +1440,7 @@ public void testOverReplicatedClosedContainerWithDecomAndMaint() SCMCommandProto.Type.deleteContainerCommand, r.getDatanodeDetails())); } + assertOverReplicatedCount(1); } /** @@ -1319,6 +1458,7 @@ public void testUnderReplicatedNotHealthySource() throws IOException { // There should be replica scheduled, but as all nodes are stale, nothing // gets scheduled. assertReplicaScheduled(0); + assertUnderReplicatedCount(1); } /** @@ -1723,6 +1863,7 @@ public void testDeleteCommandTimeout() throws private ContainerInfo createContainer(LifeCycleState containerState) throws IOException { final ContainerInfo container = getContainer(containerState); + container.setUsedBytes(1234); containerStateManager.addContainer(container.getProtobuf()); return container; } @@ -1791,6 +1932,24 @@ private void assertDeleteScheduled(int delta) throws InterruptedException { replicationManager.getMetrics().getNumDeletionCmdsSent()); } + private void assertUnderReplicatedCount(int count) { + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(count, report.getStat( + ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + } + + private void assertMissingCount(int count) { + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(count, report.getStat( + ReplicationManagerReport.HealthState.MISSING)); + } + + private void assertOverReplicatedCount(int count) { + ReplicationManagerReport report = replicationManager.getContainerReport(); + Assert.assertEquals(count, report.getStat( + ReplicationManagerReport.HealthState.OVER_REPLICATED)); + } + @After public void teardown() throws Exception { containerStateManager.close(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java new file mode 100644 index 000000000000..15aca6134c98 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManagerReport.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +/** + * Tests for the ReplicationManagerReport class. + */ +public class TestReplicationManagerReport { + + private ReplicationManagerReport report; + + @Before + public void setup() { + report = new ReplicationManagerReport(); + } + + @Test + public void testMetricCanBeIncremented() { + report.increment(ReplicationManagerReport.HealthState.UNDER_REPLICATED); + report.increment(ReplicationManagerReport.HealthState.UNDER_REPLICATED); + report.increment(ReplicationManagerReport.HealthState.OVER_REPLICATED); + + report.increment(HddsProtos.LifeCycleState.OPEN); + report.increment(HddsProtos.LifeCycleState.CLOSED); + report.increment(HddsProtos.LifeCycleState.CLOSED); + + Assert.assertEquals(2, + report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + Assert.assertEquals(1, + report.getStat(ReplicationManagerReport.HealthState.OVER_REPLICATED)); + Assert.assertEquals(0, + report.getStat(ReplicationManagerReport.HealthState.MIS_REPLICATED)); + + Assert.assertEquals(1, + report.getStat(HddsProtos.LifeCycleState.OPEN)); + Assert.assertEquals(2, + report.getStat(HddsProtos.LifeCycleState.CLOSED)); + Assert.assertEquals(0, + report.getStat(HddsProtos.LifeCycleState.QUASI_CLOSED)); + } + + @Test + public void testContainerIDsCanBeSampled() { + report.incrementAndSample( + ReplicationManagerReport.HealthState.UNDER_REPLICATED, + new ContainerID(1)); + report.incrementAndSample( + ReplicationManagerReport.HealthState.UNDER_REPLICATED, + new ContainerID(2)); + report.incrementAndSample( + ReplicationManagerReport.HealthState.OVER_REPLICATED, + new ContainerID(3)); + + Assert.assertEquals(2, + report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + Assert.assertEquals(1, + report.getStat(ReplicationManagerReport.HealthState.OVER_REPLICATED)); + Assert.assertEquals(0, + report.getStat(ReplicationManagerReport.HealthState.MIS_REPLICATED)); + + List sample = + report.getSample(ReplicationManagerReport.HealthState.UNDER_REPLICATED); + Assert.assertEquals(new ContainerID(1), sample.get(0)); + Assert.assertEquals(new ContainerID(2), sample.get(1)); + Assert.assertEquals(2, sample.size()); + + sample = + report.getSample(ReplicationManagerReport.HealthState.OVER_REPLICATED); + Assert.assertEquals(new ContainerID(3), sample.get(0)); + Assert.assertEquals(1, sample.size()); + + sample = + report.getSample(ReplicationManagerReport.HealthState.MIS_REPLICATED); + Assert.assertEquals(0, sample.size()); + } + + @Test + public void testSamplesAreLimited() { + for (int i = 0; i < ReplicationManagerReport.SAMPLE_LIMIT * 2; i++) { + report.incrementAndSample( + ReplicationManagerReport.HealthState.UNDER_REPLICATED, + new ContainerID(i)); + } + List sample = + report.getSample(ReplicationManagerReport.HealthState.UNDER_REPLICATED); + Assert.assertEquals(ReplicationManagerReport.SAMPLE_LIMIT, sample.size()); + for (int i = 0; i < ReplicationManagerReport.SAMPLE_LIMIT; i++) { + Assert.assertEquals(new ContainerID(i), sample.get(i)); + } + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java new file mode 100644 index 000000000000..60ebbb0b866d --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManagerMetrics.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.apache.hadoop.test.MetricsAsserts.getLongGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +/** + * Tests for the ReplicationManagerMetrics class. + */ +public class TestReplicationManagerMetrics { + + private ReplicationManager replicationManager; + private ReplicationManagerMetrics metrics; + + @Before + public void setup() { + ReplicationManagerReport report = new ReplicationManagerReport(); + + // Each lifecycle state has a value from 1 to N. Set the value of the metric + // to the value by incrementing the counter that number of times. + for (HddsProtos.LifeCycleState s : HddsProtos.LifeCycleState.values()) { + for (int i = 0; i < s.getNumber(); i++) { + report.increment(s); + } + } + // The ordinal starts from 0, so each state will have a value of its ordinal + for (ReplicationManagerReport.HealthState s : + ReplicationManagerReport.HealthState.values()) { + for (int i = 0; i < s.ordinal(); i++) { + report.increment(s); + } + } + replicationManager = Mockito.mock(ReplicationManager.class); + Mockito.when(replicationManager.getContainerReport()).thenReturn(report); + metrics = ReplicationManagerMetrics.create(replicationManager); + } + + @After + public void after() { + metrics.unRegister(); + } + + @Test + public void testLifeCycleStateMetricsPresent() { + Assert.assertEquals(HddsProtos.LifeCycleState.OPEN.getNumber(), + getGauge("NumOpenContainers")); + Assert.assertEquals(HddsProtos.LifeCycleState.CLOSING.getNumber(), + getGauge("NumClosingContainers")); + Assert.assertEquals(HddsProtos.LifeCycleState.QUASI_CLOSED.getNumber(), + getGauge("NumQuasiClosedContainers")); + Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED.getNumber(), + getGauge("NumClosedContainers")); + Assert.assertEquals(HddsProtos.LifeCycleState.DELETING.getNumber(), + getGauge("NumDeletingContainers")); + Assert.assertEquals(HddsProtos.LifeCycleState.DELETED.getNumber(), + getGauge("NumDeletedContainers")); + } + + @Test + public void testHealthStateMetricsPresent() { + for (ReplicationManagerReport.HealthState s : + ReplicationManagerReport.HealthState.values()) { + Assert.assertEquals(s.ordinal(), getGauge(s.getMetricName())); + } + } + + private long getGauge(String metricName) { + return getLongGauge(metricName, + getMetrics(ReplicationManagerMetrics.METRICS_SOURCE_NAME)); + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java index 69b038cf09d2..405da0f6aa09 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java @@ -431,6 +431,16 @@ public void testIsHealthyWithMaintReplicaIsHealthy() { assertTrue(rcnt.isHealthy()); } + @Test + public void testContainerWithNoReplicasIsMissing() { + Set replica = new HashSet<>(); + ContainerInfo container = createContainer(HddsProtos.LifeCycleState.CLOSED); + ContainerReplicaCount rcnt = + new ContainerReplicaCount(container, replica, 0, 0, 3, 2); + assertTrue(rcnt.isMissing()); + assertFalse(rcnt.isSufficientlyReplicated()); + } + private void validate(ContainerReplicaCount rcnt, boolean sufficientlyReplicated, int replicaDelta, boolean overReplicated) {