diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 45a1db681542..6c30ba17a529 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -185,6 +185,8 @@ message ContainerReplicaProto { CLOSED = 4; UNHEALTHY = 5; INVALID = 6; + DECOMMISSIONED = 7; + MAINTENANCE = 8; } required int64 containerID = 1; required State state = 2; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java new file mode 100644 index 000000000000..a7ea56d8e018 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import java.util.Set; + +/** + * Immutable object that is created with a set of ContainerReplica objects and + * the number of in flight replica add and deletes, the container replication + * factor and the min count which must be available for maintenance. This + * information can be used to determine if the container is over or under + * replicated and also how many additional replicas need created or removed. + */ +public class ContainerReplicaCount { + + private int healthyCount = 0; + private int decommissionCount = 0; + private int maintenanceCount = 0; + private int inFlightAdd = 0; + private int inFlightDel = 0; + private int repFactor; + private int minHealthyForMaintenance; + private Set replica; + + public ContainerReplicaCount(Set replica, int inFlightAdd, + int inFlightDelete, int replicationFactor, + int minHealthyForMaintenance) { + this.healthyCount = 0; + this.decommissionCount = 0; + this.maintenanceCount = 0; + this.inFlightAdd = inFlightAdd; + this.inFlightDel = inFlightDelete; + this.repFactor = replicationFactor; + this.replica = replica; + this.minHealthyForMaintenance + = Math.min(this.repFactor, minHealthyForMaintenance); + + for (ContainerReplica cr : this.replica) { + ContainerReplicaProto.State state = cr.getState(); + if (state == ContainerReplicaProto.State.DECOMMISSIONED) { + decommissionCount++; + } else if (state == ContainerReplicaProto.State.MAINTENANCE) { + maintenanceCount++; + } else { + healthyCount++; + } + } + } + + public int getHealthyCount() { + return healthyCount; + } + + public int getDecommissionCount() { + return decommissionCount; + } + + public int getMaintenanceCount() { + return maintenanceCount; + } + + public int getReplicationFactor() { + return repFactor; + } + + public Set getReplica() { + return replica; + } + + @Override + public String toString() { + return "Replica Count: "+replica.size()+ + " Healthy Count: "+healthyCount+ + " Decommission Count: "+decommissionCount+ + " Maintenance Count: "+maintenanceCount+ + " inFlightAdd Count: "+inFlightAdd+ + " inFightDel Count: "+inFlightDel+ + " ReplicationFactor: "+repFactor+ + " minMaintenance Count: "+minHealthyForMaintenance; + } + + /** + * Calculates the the delta of replicas which need to be created or removed + * to ensure the container is correctly replicated when considered inflight + * adds and deletes. + * + * When considering inflight operations, it is assumed any operation will + * fail. However, to consider the worst case and avoid data loss, we always + * assume a delete will succeed and and add will fail. In this way, we will + * avoid scheduling too many deletes which could result in dataloss. + * + * Decisions around over-replication are made only on healthy replicas, + * ignoring any in maintenance and also any inflight adds. InFlight adds are + * ignored, as they may not complete, so if we have: + * + * H, H, H, IN_FLIGHT_ADD + * + * And then schedule a delete, we could end up under-replicated (add fails, + * delete completes). It is better to let the inflight operations complete + * and then deal with any further over or under replication. + * + * For maintenance replicas, assuming replication factor 3, and minHealthy + * 2, it is possible for all 3 hosts to be put into maintenance, leaving the + * following (H = healthy, M = maintenance): + * + * H, H, M, M, M + * + * Even though we are tracking 5 replicas, this is not over replicated as we + * ignore the maintenance copies. Later, the replicas could look like: + * + * H, H, H, H, M + * + * At this stage, the container is over replicated by 1, so one replica can be + * removed. + * + * For containers which have replication factor healthy replica, we ignore any + * inflight add or deletes, as they may fail. Instead, wait for them to + * complete and then deal with any excess or deficit. + * + * For under replicated containers we do consider inflight add and delete to + * avoid scheduling more adds than needed. There is additional logic around + * containers with maintenance replica to ensure minHealthyForMaintenance + * replia are maintained. + * + * @return Delta of replicas needed. Negative indicates over replication and + * containers should be removed. Positive indicates over replication + * and zero indicates the containers has replicationFactor healthy + * replica + */ + public int additionalReplicaNeeded() { + int delta = missingReplicas(); + + if (delta < 0) { + // Over replicated, so may need to remove a container. Do not consider + // inFlightAdds, as they may fail, but do consider inFlightDel which + // will reduce the over-replication if it completes. + // Note this could make the delta positive if there are too many in flight + // deletes, which will result in an additional being scheduled. + return delta + inFlightDel; + } else { + // May be under or perfectly replicated. + // We must consider in flight add and delete when calculating the new + // containers needed, but we bound the lower limit at zero to allow + // inflight operations to complete before handling any potential over + // replication + return Math.max(0, delta - inFlightAdd + inFlightDel); + } + } + + /** + * Returns the count of replicas which need to be created or removed to + * ensure the container is perfectly replicate. Inflight operations are not + * considered here, but the logic to determine the missing or excess counts + * for maintenance is present. + * + * Decisions around over-replication are made only on healthy replicas, + * ignoring any in maintenance. For example, if we have: + * + * H, H, H, M, M + * + * This will not be consider over replicated until one of the Maintenance + * replicas moves to Healthy. + * + * If the container is perfectly replicated, zero will be return. + * + * If it is under replicated a positive value will be returned, indicating + * how many replicas must be added. + * + * If it is over replicated a negative value will be returned, indicating now + * many replicas to remove. + * + * @return Zero if the container is perfectly replicated, a positive value + * for under replicated and a negative value for over replicated. + */ + private int missingReplicas() { + int delta = repFactor - healthyCount; + + if (delta < 0) { + // Over replicated, so may need to remove a container. + return delta; + } else if (delta > 0) { + // May be under-replicated, depending on maintenance. + delta = Math.max(0, delta - maintenanceCount); + int neededHealthy = + Math.max(0, minHealthyForMaintenance - healthyCount); + delta = Math.max(neededHealthy, delta); + return delta; + } else { // delta == 0 + // We have exactly the number of healthy replicas needed. + return delta; + } + } + + /** + * Return true if the container is sufficiently replicated. Decommissioning + * and Decommissioned containers are ignored in this check, assuming they will + * eventually be removed from the cluster. + * This check ignores inflight additions, as those replicas have not yet been + * created and the create could fail for some reason. + * The check does consider inflight deletes as there may be 3 healthy replicas + * now, but once the delete completes it will reduce to 2. + * We also assume a replica in Maintenance state cannot be removed, so the + * pending delete would affect only the healthy replica count. + * + * @return True if the container is sufficiently replicated and False + * otherwise. + */ + public boolean isSufficientlyReplicated() { + return missingReplicas() + inFlightDel <= 0; + } + + /** + * Return true is the container is over replicated. Decommission and + * maintenance containers are ignored for this check. + * The check ignores inflight additions, as they may fail, but it does + * consider inflight deletes, as they would reduce the over replication when + * they complete. + * + * @return True if the container is over replicated, false otherwise. + */ + public boolean isOverReplicated() { + return missingReplicas() + inFlightDel < 0; + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 37afd36da019..58e38a26708c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; @@ -97,6 +98,11 @@ public class ReplicationManager implements MetricsSource { */ private final LockManager lockManager; + /** + * Used to lookup the health of a nodes or the nodes operational state. + */ + private final NodeManager nodeManager; + /** * This is used for tracking container replication commands which are issued * by ReplicationManager and not yet complete. @@ -126,6 +132,11 @@ public class ReplicationManager implements MetricsSource { */ private volatile boolean running; + /** + * Minimum number of replica in a healthy state for maintenance. + */ + private int minHealthyForMaintenance; + /** * Constructs ReplicationManager instance with the given configuration. * @@ -138,15 +149,18 @@ public ReplicationManager(final ReplicationManagerConfiguration conf, final ContainerManager containerManager, final ContainerPlacementPolicy containerPlacement, final EventPublisher eventPublisher, - final LockManager lockManager) { + final LockManager lockManager, + final NodeManager nodeManager) { this.containerManager = containerManager; this.containerPlacement = containerPlacement; this.eventPublisher = eventPublisher; this.lockManager = lockManager; + this.nodeManager = nodeManager; this.conf = conf; this.running = false; this.inflightReplication = new ConcurrentHashMap<>(); this.inflightDeletion = new ConcurrentHashMap<>(); + this.minHealthyForMaintenance = conf.getMaintenanceReplicaMinimum(); } /** @@ -241,7 +255,7 @@ private synchronized void run() { * @param id ContainerID */ private void processContainer(ContainerID id) { - lockManager.lock(id); + lockManager.writeLock(id); try { final ContainerInfo container = containerManager.getContainer(id); final Set replicas = containerManager @@ -291,24 +305,15 @@ private void processContainer(ContainerID id) { action -> replicas.stream() .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode))); - - /* - * We don't have to take any action if the container is healthy. - * - * According to ReplicationMonitor container is considered healthy if - * the container is either in QUASI_CLOSED or in CLOSED state and has - * exact number of replicas in the same state. - */ - if (isContainerHealthy(container, replicas)) { - return; - } + ContainerReplicaCount replicaSet = + getContainerReplicaCount(container, replicas); /* * Check if the container is under replicated and take appropriate * action. */ - if (isContainerUnderReplicated(container, replicas)) { - handleUnderReplicatedContainer(container, replicas); + if (!replicaSet.isSufficientlyReplicated()) { + handleUnderReplicatedContainer(container, replicaSet); return; } @@ -316,22 +321,24 @@ private void processContainer(ContainerID id) { * Check if the container is over replicated and take appropriate * action. */ - if (isContainerOverReplicated(container, replicas)) { - handleOverReplicatedContainer(container, replicas); + if (replicaSet.isOverReplicated()) { + handleOverReplicatedContainer(container, replicaSet); return; } /* - * The container is neither under nor over replicated and the container - * is not healthy. This means that the container has unhealthy/corrupted - * replica. + If we get here, the container is not over replicated or under replicated + but it may be "unhealthy", which means it has one or more replica which + are not in the same state as the container itself. */ - handleUnstableContainer(container, replicas); + if (!isContainerHealthy(container, replicas)) { + handleUnstableContainer(container, replicas); + } } catch (ContainerNotFoundException ex) { LOG.warn("Missing container {}.", id); } finally { - lockManager.unlock(id); + lockManager.writeUnlock(id); } } @@ -361,7 +368,8 @@ private void updateInflightAction(final ContainerInfo container, * Returns true if the container is healthy according to ReplicationMonitor. * * According to ReplicationMonitor container is considered healthy if - * it has exact number of replicas in the same state as the container. + * all replica which are not in a decommission or maintenance state are in + * the same state as the container and in QUASI_CLOSED or in CLOSED state. * * @param container Container to check * @param replicas Set of ContainerReplicas @@ -369,50 +377,76 @@ private void updateInflightAction(final ContainerInfo container, */ private boolean isContainerHealthy(final ContainerInfo container, final Set replicas) { - return container.getReplicationFactor().getNumber() == replicas.size() && - replicas.stream().allMatch( - r -> compareState(container.getState(), r.getState())); + return (container.getState() == LifeCycleState.CLOSED + || container.getState() == LifeCycleState.QUASI_CLOSED) + && replicas.stream() + .filter(r -> r.getState() != State.DECOMMISSIONED) + .filter(r -> r.getState() != State.MAINTENANCE) + .allMatch(r -> compareState(container.getState(), r.getState())); } /** - * Checks if the container is under replicated or not. - * - * @param container Container to check - * @param replicas Set of ContainerReplicas - * @return true if the container is under replicated, false otherwise + * Returns the number replica which are pending creation for the given + * container ID. + * @param id The ContainerID for which to check the pending replica + * @return The number of inflight additions or zero if none */ - private boolean isContainerUnderReplicated(final ContainerInfo container, - final Set replicas) { - return container.getReplicationFactor().getNumber() > - getReplicaCount(container.containerID(), replicas); + private int getInflightAdd(final ContainerID id) { + return inflightReplication.getOrDefault(id, Collections.emptyList()).size(); } /** - * Checks if the container is over replicated or not. - * - * @param container Container to check - * @param replicas Set of ContainerReplicas - * @return true if the container if over replicated, false otherwise + * Returns the number replica which are pending delete for the given + * container ID. + * @param id The ContainerID for which to check the pending replica + * @return The number of inflight deletes or zero if none */ - private boolean isContainerOverReplicated(final ContainerInfo container, - final Set replicas) { - return container.getReplicationFactor().getNumber() < - getReplicaCount(container.containerID(), replicas); + private int getInflightDel(final ContainerID id) { + return inflightDeletion.getOrDefault(id, Collections.emptyList()).size(); } /** - * Returns the replication count of the given container. This also - * considers inflight replication and deletion. + * Given a container, obtain the set of known replica for it, and return a + * ContainerReplicaCount object. This object will contain the set of replica + * as well as all information required to determine if the container is over + * or under replicated, including the delta of replica required to repair the + * over or under replication. * - * @param id ContainerID - * @param replicas Set of existing replicas - * @return number of estimated replicas for this container - */ - private int getReplicaCount(final ContainerID id, - final Set replicas) { - return replicas.size() - + inflightReplication.getOrDefault(id, Collections.emptyList()).size() - - inflightDeletion.getOrDefault(id, Collections.emptyList()).size(); + * @param container The container to create a ContainerReplicaCount for + * @return ContainerReplicaCount representing the replicated state of the + * container. + * @throws ContainerNotFoundException + */ + public ContainerReplicaCount getContainerReplicaCount(ContainerInfo container) + throws ContainerNotFoundException { + lockManager.readLock(container.containerID()); + try { + final Set replica = containerManager + .getContainerReplicas(container.containerID()); + return getContainerReplicaCount(container, replica); + } finally { + lockManager.readUnlock(container.containerID()); + } + } + + /** + * Given a container and its set of replicas, create and return a + * ContainerReplicaCount representing the container. + * + * @param container The container for which to construct a + * ContainerReplicaCount + * @param replica The set of existing replica for this container + * @return ContainerReplicaCount representing the current state of the + * container + */ + private ContainerReplicaCount getContainerReplicaCount( + ContainerInfo container, Set replica) { + return new ContainerReplicaCount( + replica, + getInflightAdd(container.containerID()), + getInflightDel(container.containerID()), + container.getReplicationFactor().getNumber(), + minHealthyForMaintenance); } /** @@ -478,13 +512,29 @@ private void forceCloseContainer(final ContainerInfo container, * and send replicate container command to the identified datanode(s). * * @param container ContainerInfo - * @param replicas Set of ContainerReplicas + * @param replicaSet An instance of ContainerReplicaCount, containing the + * current replica count and inflight adds and deletes */ private void handleUnderReplicatedContainer(final ContainerInfo container, - final Set replicas) { - LOG.debug("Handling underreplicated container: {}", + final ContainerReplicaCount replicaSet) { + LOG.debug("Handling under replicated container: {}", container.getContainerID()); + Set replicas = replicaSet.getReplica(); try { + + if (replicaSet.isSufficientlyReplicated()) { + LOG.info("The container {} with replicas {} is sufficiently "+ + "replicated", container.getContainerID(), replicaSet); + return; + } + int repDelta = replicaSet.additionalReplicaNeeded(); + if (repDelta <= 0) { + LOG.info("The container {} with {} is not sufficiently " + + "replicated but no further replicas will be scheduled until "+ + "in-flight operations complete", + container.getContainerID(), replicaSet); + return; + } final ContainerID id = container.containerID(); final List deletionInFlight = inflightDeletion .getOrDefault(id, Collections.emptyList()) @@ -494,15 +544,19 @@ private void handleUnderReplicatedContainer(final ContainerInfo container, final List source = replicas.stream() .filter(r -> r.getState() == State.QUASI_CLOSED || - r.getState() == State.CLOSED) + r.getState() == State.CLOSED || + r.getState() == State.DECOMMISSIONED || + r.getState() == State.MAINTENANCE) + // Exclude stale and dead nodes. This is particularly important for + // maintenance nodes, as the replicas will remain present in the + // container manager, even when they go dead. + .filter(r -> + nodeManager.getNodeStatus(r.getDatanodeDetails()).isHealthy()) .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails())) .sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId())) .map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toList()); if (source.size() > 0) { - final int replicationFactor = container - .getReplicationFactor().getNumber(); - final int delta = replicationFactor - getReplicaCount(id, replicas); final List excludeList = replicas.stream() .map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toList()); @@ -511,13 +565,14 @@ private void handleUnderReplicatedContainer(final ContainerInfo container, actionList.stream().map(r -> r.datanode) .forEach(excludeList::add); } + // At this point we have all live source nodes and we have consider final List selectedDatanodes = containerPlacement - .chooseDatanodes(excludeList, null, delta, + .chooseDatanodes(excludeList, null, repDelta, container.getUsedBytes()); LOG.info("Container {} is under replicated. Expected replica count" + - " is {}, but found {}.", id, replicationFactor, - replicationFactor - delta); + " is {}, but found {}. An additional {} replica are needed", + id, replicaSet.getReplicationFactor(), replicaSet, repDelta); for (DatanodeDetails datanode : selectedDatanodes) { sendReplicateCommand(container, datanode, source); @@ -538,17 +593,16 @@ private void handleUnderReplicatedContainer(final ContainerInfo container, * identified datanode(s). * * @param container ContainerInfo - * @param replicas Set of ContainerReplicas + * @param replicaSet An instance of ContainerReplicaCount, containing the + * current replica count and inflight adds and deletes */ private void handleOverReplicatedContainer(final ContainerInfo container, - final Set replicas) { + final ContainerReplicaCount replicaSet) { + final Set replicas = replicaSet.getReplica(); final ContainerID id = container.containerID(); final int replicationFactor = container.getReplicationFactor().getNumber(); - // Dont consider inflight replication while calculating excess here. - final int excess = replicas.size() - replicationFactor - - inflightDeletion.getOrDefault(id, Collections.emptyList()).size(); - + final int excess = replicaSet.additionalReplicaNeeded() * -1; if (excess > 0) { LOG.info("Container {} is over replicated. Expected replica count" + @@ -566,6 +620,11 @@ private void handleOverReplicatedContainer(final ContainerInfo container, // Retain one healthy replica per origin node Id. final List eligibleReplicas = new ArrayList<>(replicas); eligibleReplicas.removeAll(uniqueReplicas.values()); + // Replica which are maintenance or decommissioned are not eligible to + // be removed, as they do not count toward over-replication and they also + // many not be available + eligibleReplicas.removeIf(r -> (r.getState() == State.MAINTENANCE + || r.getState() == State.DECOMMISSIONED)); final List unhealthyReplicas = eligibleReplicas .stream() @@ -801,6 +860,12 @@ public static class ReplicationManagerConfiguration { */ private long eventTimeout = 10 * 60 * 1000; + /** + * The number of container replica which must be available for a node to + * enter maintenance. + */ + private int maintenanceReplicaMinimum = 2; + @Config(key = "thread.interval", type = ConfigType.TIME, defaultValue = "300s", @@ -825,6 +890,19 @@ public void setEventTimeout(long eventTimeout) { this.eventTimeout = eventTimeout; } + @Config(key = "maintenance.replica.minimum", + type = ConfigType.INT, + defaultValue = "2", + tags = {SCM, OZONE}, + description = "The minimum number of container replicas which must " + + " be available for a node to enter maintenance. If putting a " + + " node into maintenance reduces the available replicas for any " + + " container below this level, the node will remain in the " + + " entering maintenance state until a new replica is created.") + public void setMaintenanceReplicaMinimum(int replicaCount) { + this.maintenanceReplicaMinimum = replicaCount; + } + public long getInterval() { return interval; } @@ -832,6 +910,10 @@ public long getInterval() { public long getEventTimeout() { return eventTimeout; } + + public int getMaintenanceReplicaMinimum() { + return maintenanceReplicaMinimum; + } } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java index 0776c2894e15..5c1adf7ad526 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStatus.java @@ -61,6 +61,75 @@ public HddsProtos.NodeOperationalState getOperationalState() { return operationalState; } + /** + * Returns true if the nodeStatus indicates the node is in any decommission + * state. + * + * @return True if the node is in any decommission state, false otherwise + */ + public boolean isDecommission() { + return operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONING + || operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONED; + } + + /** + * Returns true if the node is currently decommissioning. + * + * @return True if the node is decommissioning, false otherwise + */ + public boolean isDecommissioning() { + return operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONING; + } + + /** + * Returns true if the node is decommissioned. + * + * @return True if the node is decommissioned, false otherwise + */ + public boolean isDecommissioned() { + return operationalState == HddsProtos.NodeOperationalState.DECOMMISSIONED; + } + + /** + * Returns true if the node is in any maintenance state. + * + * @return True if the node is in any maintenance state, false otherwise + */ + public boolean isMaintenance() { + return operationalState + == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE + || operationalState == HddsProtos.NodeOperationalState.IN_MAINTENANCE; + } + + /** + * Returns true if the node is currently entering maintenance. + * + * @return True if the node is entering maintenance, false otherwise + */ + public boolean isEnteringMaintenance() { + return operationalState + == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; + } + + /** + * Returns true if the node is currently in maintenance. + * + * @return True if the node is in maintenance, false otherwise. + */ + public boolean isInMaintenance() { + return operationalState == HddsProtos.NodeOperationalState.IN_MAINTENANCE; + } + + /** + * Returns true if the nodeStatus is healthy (ie not stale or dead) and false + * otherwise. + * + * @return True if the node is Healthy, false otherwise + */ + public boolean isHealthy() { + return health == HddsProtos.NodeState.HEALTHY; + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 4df38a3af323..d285e192bed1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -434,7 +434,8 @@ private void initializeSystemManagers(OzoneConfiguration conf, containerManager, containerPlacementPolicy, eventQueue, - new LockManager<>(conf)); + new LockManager<>(conf), + scmNodeManager); } if(configurator.getScmSafeModeManager() != null) { scmSafeModeManager = configurator.getScmSafeModeManager(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index 1631447af1f5..158b1bd437de 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -21,21 +21,36 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration; +import org.apache.hadoop.hdds.scm.container.ReplicationManager + .ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.lock.LockManager; +import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -69,12 +84,16 @@ public class TestReplicationManager { private ContainerPlacementPolicy containerPlacementPolicy; private EventQueue eventQueue; private DatanodeCommandHandler datanodeCommandHandler; + private SimpleNodeManager nodeManager; + private ContainerManager containerManager; + private Configuration conf; @Before public void setup() throws IOException, InterruptedException { - final Configuration conf = new OzoneConfiguration(); - final ContainerManager containerManager = + conf = new OzoneConfiguration(); + containerManager = Mockito.mock(ContainerManager.class); + nodeManager = new SimpleNodeManager(); eventQueue = new EventQueue(); containerStateManager = new ContainerStateManager(conf); @@ -106,12 +125,27 @@ public void setup() throws IOException, InterruptedException { .collect(Collectors.toList()); }); + createReplicationManager(new ReplicationManagerConfiguration()); replicationManager = new ReplicationManager( new ReplicationManagerConfiguration(), containerManager, containerPlacementPolicy, eventQueue, - new LockManager<>(conf)); + new LockManager<>(conf), + nodeManager); + replicationManager.start(); + Thread.sleep(100L); + } + + private void createReplicationManager(ReplicationManagerConfiguration rmConf) + throws InterruptedException { + replicationManager = new ReplicationManager( + rmConf, + containerManager, + containerPlacementPolicy, + eventQueue, + new LockManager<>(conf), + nodeManager); replicationManager.start(); Thread.sleep(100L); } @@ -606,6 +640,213 @@ public void testGeneratedConfig() { } + /** + * ReplicationManager should replicate an additional replica if there are + * decommissioned replicas. + */ + @Test + public void testUnderReplicatedDueToDecommission() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + State.CLOSED, State.DECOMMISSIONED, State.DECOMMISSIONED); + assertReplicaScheduled(2); + } + + /** + * ReplicationManager should replicate an additional replica when all copies + * are decommissioning. + */ + @Test + public void testUnderReplicatedDueToAllDecommission() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + State.DECOMMISSIONED, State.DECOMMISSIONED, State.DECOMMISSIONED); + assertReplicaScheduled(3); + } + + /** + * ReplicationManager should not take any action when the container is + * correctly replicated with decommissioned replicas still present. + */ + @Test + public void testCorrectlyReplicatedWithDecommission() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + State.CLOSED, State.CLOSED, State.CLOSED, State.DECOMMISSIONED); + assertReplicaScheduled(0); + } + + /** + * ReplicationManager should replicate an additional replica when min rep + * is not met for maintenance. + */ + @Test + public void testUnderReplicatedDueToMaintenance() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + State.CLOSED, State.MAINTENANCE, State.MAINTENANCE); + assertReplicaScheduled(1); + } + + /** + * ReplicationManager should not replicate an additional replica when if + * min replica for maintenance is 1 and another replica is available. + */ + @Test + public void testNotUnderReplicatedDueToMaintenanceMinRepOne() throws + SCMException, ContainerNotFoundException, InterruptedException { + replicationManager.stop(); + ReplicationManagerConfiguration newConf = + new ReplicationManagerConfiguration(); + newConf.setMaintenanceReplicaMinimum(1); + createReplicationManager(newConf); + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + State.CLOSED, State.MAINTENANCE, State.MAINTENANCE); + assertReplicaScheduled(0); + } + + /** + * ReplicationManager should replicate an additional replica when all copies + * are going off line and min rep is 1. + */ + @Test + public void testUnderReplicatedDueToMaintenanceMinRepOne() throws + SCMException, ContainerNotFoundException, InterruptedException { + replicationManager.stop(); + ReplicationManagerConfiguration newConf = + new ReplicationManagerConfiguration(); + newConf.setMaintenanceReplicaMinimum(1); + createReplicationManager(newConf); + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + State.MAINTENANCE, State.MAINTENANCE, State.MAINTENANCE); + assertReplicaScheduled(1); + } + + /** + * ReplicationManager should replicate additional replica when all copies + * are going into maintenance. + */ + @Test + public void testUnderReplicatedDueToAllMaintenance() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + State.MAINTENANCE, State.MAINTENANCE, State.MAINTENANCE); + assertReplicaScheduled(2); + } + + /** + * ReplicationManager should not replicate additional replica sufficient + * replica are available. + */ + @Test + public void testCorrectlyReplicatedWithMaintenance() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + State.CLOSED, State.CLOSED, State.MAINTENANCE, State.MAINTENANCE); + assertReplicaScheduled(0); + } + + /** + * ReplicationManager should replicate additional replica when all copies + * are decommissioning or maintenance. + */ + @Test + public void testUnderReplicatedWithDecommissionAndMaintenance() throws + SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + State.DECOMMISSIONED, State.DECOMMISSIONED, State.MAINTENANCE, + State.MAINTENANCE); + assertReplicaScheduled(2); + } + + /** + * When a CLOSED container is over replicated, ReplicationManager + * deletes the excess replicas. While choosing the replica for deletion + * ReplicationManager should not attempt to remove a DECOMMISSION or + * MAINTENANCE replica. + */ + @Test + public void testOverReplicatedClosedContainerWithDecomAndMaint() + throws SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + State.DECOMMISSIONED, State.MAINTENANCE, + State.CLOSED, State.CLOSED, State.CLOSED, State.CLOSED); + + final int currentDeleteCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentDeleteCommandCount + 1, datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.deleteContainerCommand)); + // Get the DECOM and Maint replica and ensure none of them are scheduled + // for removal + Set decom = + containerStateManager.getContainerReplicas(container.containerID()) + .stream() + .filter(r -> r.getState() != State.CLOSED) + .collect(Collectors.toSet()); + for (ContainerReplica r : decom) { + Assert.assertFalse(datanodeCommandHandler.received( + SCMCommandProto.Type.deleteContainerCommand, + r.getDatanodeDetails())); + } + } + + /** + * Replication Manager should not attempt to replicate from an unhealthy + * (stale or dead) node. To test this, setup a scenario where a replia needs + * to be created, but mark all nodes stale. That way, no new replica will be + * scheduled. + */ + @Test + public void testUnderReplicatedNotHealthySource() + throws SCMException, ContainerNotFoundException, InterruptedException { + final ContainerInfo container = setupReplicas(LifeCycleState.CLOSED, + NodeStatus.inServiceStale(), + State.CLOSED, State.DECOMMISSIONED, State.DECOMMISSIONED); + // There should be replica scheduled, but as all nodes are stale, nothing + // gets scheduled. + assertReplicaScheduled(0); + } + + private ContainerInfo setupReplicas( + LifeCycleState containerState, State... states) + throws SCMException, ContainerNotFoundException { + return setupReplicas(containerState, NodeStatus.inServiceHealthy(), states); + } + + private ContainerInfo setupReplicas( + LifeCycleState containerState, NodeStatus allNodesStatus, State... states) + throws SCMException, ContainerNotFoundException { + final ContainerInfo container = getContainer(containerState); + final ContainerID id = container.containerID(); + containerStateManager.loadContainer(container); + final UUID originNodeId = UUID.randomUUID(); + + for (State s : states) { + DatanodeDetails dn = randomDatanodeDetails(); + nodeManager.register(dn, allNodesStatus); + final ContainerReplica replica = getReplicas( + id, s, 1000L, originNodeId, dn); + containerStateManager.updateContainerReplica(id, replica); + } + return container; + } + + private void assertReplicaScheduled(int delta) throws InterruptedException { + final int currentReplicateCommandCount = datanodeCommandHandler + .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand); + + replicationManager.processContainersNow(); + // Wait for EventQueue to call the event handler + Thread.sleep(100L); + Assert.assertEquals(currentReplicateCommandCount + delta, + datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + } + @After public void teardown() throws IOException { containerStateManager.close(); @@ -659,4 +900,180 @@ private boolean received(final SCMCommandProto.Type type, } } + private class SimpleNodeManager implements NodeManager { + + private Map nodeMap = new HashMap(); + + public void register(DatanodeDetails dd, NodeStatus status) { + nodeMap.put(dd.getUuid(), new DatanodeInfo(dd, status)); + } + + /** + * If the given node was registed with the nodeManager, return the + * NodeStatus for the node. Otherwise return a NodeStatus of "In Service + * and Healthy". + * @param datanodeDetails DatanodeDetails + * @return The NodeStatus of the node if it is registered, otherwise an + * Inservice and Healthy NodeStatus. + */ + @Override + public NodeStatus getNodeStatus(DatanodeDetails datanodeDetails) { + DatanodeInfo dni = nodeMap.get(datanodeDetails.getUuid()); + if (dni != null) { + return dni.getNodeStatus(); + } else { + return NodeStatus.inServiceHealthy(); + } + } + + /** + * Below here, are all auto-generate placeholder methods to implement the + * interface. + */ + @Override + public List getNodes(NodeStatus nodeStatus) { + return null; + } + + @Override + public List getNodes( + HddsProtos.NodeOperationalState opState, HddsProtos.NodeState health) { + return null; + } + + @Override + public int getNodeCount(NodeStatus nodeStatus) { + return 0; + } + + @Override + public int getNodeCount(HddsProtos.NodeOperationalState opState, + HddsProtos.NodeState health) { + return 0; + } + + @Override + public List getAllNodes() { + return null; + } + + @Override + public SCMNodeStat getStats() { + return null; + } + + @Override + public Map getNodeStats() { + return null; + } + + @Override + public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) { + return null; + } + + @Override + public void setNodeOperationalState(DatanodeDetails datanodeDetails, + HddsProtos.NodeOperationalState newState) throws NodeNotFoundException { + } + + @Override + public Set getPipelines(DatanodeDetails datanodeDetails) { + return null; + } + + @Override + public void addPipeline(Pipeline pipeline) { + } + + @Override + public void removePipeline(Pipeline pipeline) { + } + + @Override + public void addContainer(DatanodeDetails datanodeDetails, + ContainerID containerId) throws NodeNotFoundException { + } + + @Override + public void setContainers(DatanodeDetails datanodeDetails, + Set containerIds) throws NodeNotFoundException { + } + + @Override + public Set getContainers(DatanodeDetails datanodeDetails) + throws NodeNotFoundException { + return null; + } + + @Override + public void addDatanodeCommand(UUID dnId, SCMCommand command) { + } + + @Override + public void processNodeReport(DatanodeDetails datanodeDetails, + StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport) { + } + + @Override + public List getCommandQueue(UUID dnID) { + return null; + } + + @Override + public DatanodeDetails getNodeByUuid(String uuid) { + return null; + } + + @Override + public List getNodesByAddress(String address) { + return null; + } + + @Override + public void close() throws IOException { + + } + + @Override + public Map getNodeCount() { + return null; + } + + @Override + public Map getNodeInfo() { + return null; + } + + @Override + public void onMessage(CommandForDatanode commandForDatanode, + EventPublisher publisher) { + } + + @Override + public VersionResponse getVersion( + StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto + versionRequest) { + return null; + } + + @Override + public RegisteredCommand register(DatanodeDetails datanodeDetails, + StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport, + StorageContainerDatanodeProtocolProtos.PipelineReportsProto + pipelineReport) { + return null; + } + + @Override + public List processHeartbeat(DatanodeDetails datanodeDetails) { + return null; + } + + @Override + public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) { + return null; + } + } + } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java index 003035cf0694..abf7f9ff22cb 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java @@ -47,17 +47,11 @@ import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; import org.hamcrest.MatcherAssert; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import org.junit.Assert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import static org.mockito.Matchers.anyObject; -import org.mockito.Mockito; import static org.mockito.Mockito.when; /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java new file mode 100644 index 000000000000..9a502327792e --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestContainerReplicaCount.java @@ -0,0 +1,333 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.states; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount; +import org.junit.Before; +import org.junit.Test; +import java.util.*; +import static junit.framework.TestCase.assertEquals; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State + .DECOMMISSIONED; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State + .MAINTENANCE; + +/** + * Class used to test the ContainerReplicaCount class. + */ +public class TestContainerReplicaCount { + + @Before + public void setup() { + } + + @Test + public void testThreeHealthyReplica() { + Set replica = registerNodes(CLOSED, CLOSED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, true, 0, false); + } + + @Test + public void testTwoHealthyReplica() { + Set replica = registerNodes(CLOSED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, false, 1, false); + } + + @Test + public void testOneHealthyReplica() { + Set replica = registerNodes(CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, false, 2, false); + } + + @Test + public void testTwoHealthyAndInflightAdd() { + Set replica = registerNodes(CLOSED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2); + validate(rcnt, false, 0, false); + } + + @Test + /** + * This does not schedule a container to be removed, as the inFlight add may + * fail and then the delete would make things under-replicated. Once the add + * completes there will be 4 healthy and it will get taken care of then. + */ + public void testThreeHealthyAndInflightAdd() { + Set replica = registerNodes(CLOSED, CLOSED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2); + validate(rcnt, true, 0, false); + } + + @Test + /** + * As the inflight delete may fail, but as it will make the the container + * under replicated, we go ahead and schedule another replica to be added. + */ + public void testThreeHealthyAndInflightDelete() { + Set replica = registerNodes(CLOSED, CLOSED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 1, 3, 2); + validate(rcnt, false, 1, false); + } + + @Test + /** + * This is NOT sufficiently replicated as the inflight add may fail and the + * inflight del could succeed, leaving only 2 healthy replicas. + */ + public void testThreeHealthyAndInflightAddAndInFlightDelete() { + Set replica = registerNodes(CLOSED, CLOSED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 1, 3, 2); + validate(rcnt, false, 0, false); + } + + @Test + public void testFourHealthyReplicas() { + Set replica = + registerNodes(CLOSED, CLOSED, CLOSED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, true, -1, true); + } + + @Test + public void testFourHealthyReplicasAndInFlightDelete() { + Set replica = + registerNodes(CLOSED, CLOSED, CLOSED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 1, 3, 2); + validate(rcnt, true, 0, false); + } + + @Test + public void testFourHealthyReplicasAndTwoInFlightDelete() { + Set replica = + registerNodes(CLOSED, CLOSED, CLOSED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 2, 3, 2); + validate(rcnt, false, 1, false); + } + + @Test + public void testOneHealthyReplicaRepFactorOne() { + Set replica = registerNodes(CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2); + validate(rcnt, true, 0, false); + } + + @Test + public void testOneHealthyReplicaRepFactorOneInFlightDelete() { + Set replica = registerNodes(CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 1, 1, 2); + validate(rcnt, false, 1, false); + } + + @Test + public void testTwoHealthyReplicaTwoInflightAdd() { + Set replica = registerNodes(CLOSED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 2, 0, 3, 2); + validate(rcnt, false, 0, false); + } + + /** + * From here consider decommission replicas. + */ + + @Test + public void testThreeHealthyAndTwoDecommission() { + Set replica = registerNodes(CLOSED, CLOSED, CLOSED, + DECOMMISSIONED, DECOMMISSIONED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, true, 0, false); + } + + @Test + public void testOneDecommissionedReplica() { + Set replica = + registerNodes(CLOSED, CLOSED, DECOMMISSIONED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, false, 1, false); + } + + @Test + public void testTwoHealthyOneDecommissionedneInFlightAdd() { + Set replica = + registerNodes(CLOSED, CLOSED, DECOMMISSIONED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2); + validate(rcnt, false, 0, false); + } + + @Test + public void testAllDecommissioned() { + Set replica = + registerNodes(DECOMMISSIONED, DECOMMISSIONED, DECOMMISSIONED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, false, 3, false); + } + + @Test + public void testAllDecommissionedRepFactorOne() { + Set replica = registerNodes(DECOMMISSIONED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2); + validate(rcnt, false, 1, false); + } + + @Test + public void testAllDecommissionedRepFactorOneInFlightAdd() { + Set replica = registerNodes(DECOMMISSIONED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 1, 2); + validate(rcnt, false, 0, false); + } + + @Test + public void testOneHealthyOneDecommissioningRepFactorOne() { + Set replica = registerNodes(DECOMMISSIONED, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2); + validate(rcnt, true, 0, false); + } + + /** + * Maintenance tests from here. + */ + + @Test + public void testOneHealthyTwoMaintenanceMinRepOfTwo() { + Set replica = + registerNodes(CLOSED, MAINTENANCE, MAINTENANCE); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, false, 1, false); + } + + @Test + public void testOneHealthyThreeMaintenanceMinRepOfTwo() { + Set replica = registerNodes(CLOSED, + MAINTENANCE, MAINTENANCE, MAINTENANCE); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, false, 1, false); + } + + @Test + public void testOneHealthyTwoMaintenanceMinRepOfOne() { + Set replica = + registerNodes(CLOSED, MAINTENANCE, MAINTENANCE); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 1); + validate(rcnt, true, 0, false); + } + + @Test + public void testOneHealthyThreeMaintenanceMinRepOfTwoInFlightAdd() { + Set replica = registerNodes(CLOSED, + MAINTENANCE, MAINTENANCE, MAINTENANCE); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2); + validate(rcnt, false, 0, false); + } + + @Test + public void testAllMaintenance() { + Set replica = + registerNodes(MAINTENANCE, MAINTENANCE, MAINTENANCE); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, false, 2, false); + } + + @Test + /** + * As we have exactly 3 healthy, but then an excess of maintenance copies + * we ignore the over-replication caused by the maintenance copies until they + * come back online, and then deal with them. + */ + public void testThreeHealthyTwoInMaintenance() { + Set replica = registerNodes(CLOSED, CLOSED, CLOSED, + MAINTENANCE, MAINTENANCE); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, true, 0, false); + } + + @Test + /** + * This is somewhat similar to testThreeHealthyTwoInMaintenance() except now + * one of the maintenance copies has become healthy and we will need to remove + * the over-replicated healthy container. + */ + public void testFourHealthyOneInMaintenance() { + Set replica = + registerNodes(CLOSED, CLOSED, CLOSED, CLOSED, MAINTENANCE); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 3, 2); + validate(rcnt, true, -1, true); + } + + @Test + public void testOneMaintenanceMinRepOfTwoRepFactorOne() { + Set replica = registerNodes(MAINTENANCE); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2); + validate(rcnt, false, 1, false); + } + + @Test + public void testOneMaintenanceMinRepOfTwoRepFactorOneInFlightAdd() { + Set replica = registerNodes(MAINTENANCE); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 1, 2); + validate(rcnt, false, 0, false); + } + + @Test + public void testOneHealthyOneMaintenanceRepFactorOne() { + Set replica = registerNodes(MAINTENANCE, CLOSED); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 0, 0, 1, 2); + validate(rcnt, true, 0, false); + } + + @Test + public void testTwoDecomTwoMaintenanceOneInflightAdd() { + Set replica = + registerNodes(DECOMMISSIONED, DECOMMISSIONED, MAINTENANCE, MAINTENANCE); + ContainerReplicaCount rcnt = new ContainerReplicaCount(replica, 1, 0, 3, 2); + validate(rcnt, false, 1, false); + } + + private void validate(ContainerReplicaCount rcnt, + boolean sufficientlyReplicated, int replicaDelta, boolean overRelicated) { + assertEquals(sufficientlyReplicated, rcnt.isSufficientlyReplicated()); + assertEquals(replicaDelta, rcnt.additionalReplicaNeeded()); + } + + private Set registerNodes( + ContainerReplicaProto.State... states) { + Set replica = new HashSet<>(); + for (ContainerReplicaProto.State s : states) { + DatanodeDetails dn = TestUtils.randomDatanodeDetails(); + replica.add(new ContainerReplica.ContainerReplicaBuilder() + .setContainerID(new ContainerID(1)) + .setContainerState(s) + .setDatanodeDetails(dn) + .setOriginNodeId(dn.getUuid()) + .setSequenceId(1) + .build()); + } + return replica; + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java index 0aa02218dfca..acd89937b5f5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java @@ -34,6 +34,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.io.IOException; @@ -127,7 +128,9 @@ public void testNodeCanBeQueuedAndCancelled() { } + @Test + @Ignore // HDDS-2631 public void testMonitoredNodeHasPipelinesClosed() throws NodeNotFoundException, TimeoutException, InterruptedException { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java index 5572e9aa1ef4..a2587a733a6a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java @@ -23,11 +23,13 @@ import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.container.ReplicationManager; import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; @@ -54,6 +56,7 @@ public class TestSafeModeHandler { private EventQueue eventQueue; private SCMSafeModeManager.SafeModeStatus safeModeStatus; private PipelineManager scmPipelineManager; + private NodeManager nodeManager; public void setup(boolean enabled) { configuration = new OzoneConfiguration(); @@ -68,10 +71,11 @@ public void setup(boolean enabled) { Mockito.mock(ContainerManager.class); Mockito.when(containerManager.getContainerIDs()) .thenReturn(new HashSet<>()); + nodeManager = new MockNodeManager(false, 0); replicationManager = new ReplicationManager( new ReplicationManagerConfiguration(), containerManager, Mockito.mock(ContainerPlacementPolicy.class), - eventQueue, new LockManager(configuration)); + eventQueue, new LockManager(configuration), nodeManager); scmPipelineManager = Mockito.mock(SCMPipelineManager.class); blockManager = Mockito.mock(BlockManagerImpl.class); safeModeHandler = diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java index 5cf086409f1f..159683c1768e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java @@ -20,7 +20,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.client.ContainerOperationClient; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.ozone.MiniOzoneCluster;