diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 290365364a5b..d6470551574b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.StringJoiner; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -40,6 +41,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; @@ -50,7 +53,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.protocol.proto. + StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.events.SCMEvents; @@ -135,6 +139,64 @@ public class ReplicationManager implements MetricsSource, SCMService { */ private final Map> inflightDeletion; + /** + * This is used for tracking container move commands + * which are not yet complete. + */ + private final Map> inflightMove; + + /** + * This is used for indicating the result of move option and + * the corresponding reason. this is useful for tracking + * the result of move option + */ + enum MoveResult { + // both replication and deletion are completed + COMPLETED, + // RM is not running + RM_NOT_RUNNING, + // replication fail because the container does not exist in src + REPLICATION_FAIL_NOT_EXIST_IN_SOURCE, + // replication fail because the container exists in target + REPLICATION_FAIL_EXIST_IN_TARGET, + // replication fail because the container is not cloesed + REPLICATION_FAIL_CONTAINER_NOT_CLOSED, + // replication fail because the container is in inflightDeletion + REPLICATION_FAIL_INFLIGHT_DELETION, + // replication fail because the container is in inflightReplication + REPLICATION_FAIL_INFLIGHT_REPLICATION, + // replication fail because of timeout + REPLICATION_FAIL_TIME_OUT, + // replication fail because of node is not in service + REPLICATION_FAIL_NODE_NOT_IN_SERVICE, + // replication fail because node is unhealthy + REPLICATION_FAIL_NODE_UNHEALTHY, + // replication succeed, but deletion fail because of timeout + DELETION_FAIL_TIME_OUT, + // replication succeed, but deletion fail because because + // node is unhealthy + DELETION_FAIL_NODE_UNHEALTHY, + // replication succeed, but if we delete the container from + // the source datanode , the policy(eg, replica num or + // rack location) will not be satisfied, so we should not delete + // the container + DELETE_FAIL_POLICY, + // replicas + target - src does not satisfy placement policy + PLACEMENT_POLICY_NOT_SATISFIED, + //unexpected action, remove src at inflightReplication + UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION, + //unexpected action, remove target at inflightDeletion + UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION + } + + /** + * This is used for tracking container move commands + * which are not yet complete. + */ + private final Map> inflightMoveFuture; + /** * ReplicationManager specific configuration. */ @@ -194,6 +256,8 @@ public ReplicationManager(final ConfigurationSource conf, this.running = false; this.inflightReplication = new ConcurrentHashMap<>(); this.inflightDeletion = new ConcurrentHashMap<>(); + this.inflightMove = new ConcurrentHashMap<>(); + this.inflightMoveFuture = new ConcurrentHashMap<>(); this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum(); this.clock = clock; @@ -214,7 +278,6 @@ public ReplicationManager(final ConfigurationSource conf, */ @Override public synchronized void start() { - if (!isRunning()) { DefaultMetricsSystem.instance().register(METRICS_SOURCE_NAME, "SCM Replication manager (closed container replication) related " @@ -264,6 +327,9 @@ public synchronized void stop() { LOG.info("Stopping Replication Monitor Thread."); inflightReplication.clear(); inflightDeletion.clear(); + //TODO: replicate inflight move through ratis + inflightMove.clear(); + inflightMoveFuture.clear(); running = false; DefaultMetricsSystem.instance().unregisterSource(METRICS_SOURCE_NAME); notifyAll(); @@ -458,13 +524,17 @@ private void updateInflightAction(final ContainerInfo container, try { InflightAction a = iter.next(); NodeStatus status = nodeManager.getNodeStatus(a.datanode); - NodeState state = status.getHealth(); - NodeOperationalState opState = status.getOperationalState(); - if (state != NodeState.HEALTHY || a.time < deadline || - filter.test(a) || opState != NodeOperationalState.IN_SERVICE) { + boolean isUnhealthy = status.getHealth() != NodeState.HEALTHY; + boolean isCompleted = filter.test(a); + boolean isTimeout = a.time < deadline; + boolean isNotInService = status.getOperationalState() != + NodeOperationalState.IN_SERVICE; + if (isCompleted || isUnhealthy || isTimeout || isNotInService) { iter.remove(); + updateMoveIfNeeded(isUnhealthy, isCompleted, isTimeout, + container, a.datanode, inflightActions); } - } catch (NodeNotFoundException e) { + } catch (NodeNotFoundException | ContainerNotFoundException e) { // Should not happen, but if it does, just remove the action as the // node somehow does not exist; iter.remove(); @@ -476,6 +546,247 @@ private void updateInflightAction(final ContainerInfo container, } } + /** + * update inflight move if needed. + * + * @param isUnhealthy is the datanode unhealthy + * @param isCompleted is the action completed + * @param isTimeout is the action timeout + * @param container Container to update + * @param dn datanode which is removed from the inflightActions + * @param inflightActions inflightReplication (or) inflightDeletion + */ + private void updateMoveIfNeeded(final boolean isUnhealthy, + final boolean isCompleted, final boolean isTimeout, + final ContainerInfo container, final DatanodeDetails dn, + final Map> inflightActions) + throws ContainerNotFoundException { + // make sure inflightMove contains the container + ContainerID id = container.containerID(); + if (!inflightMove.containsKey(id)) { + return; + } + + // make sure the datanode , which is removed from inflightActions, + // is source or target datanode. + Pair kv = inflightMove.get(id); + final boolean isSource = kv.getKey().equals(dn); + final boolean isTarget = kv.getValue().equals(dn); + if (!isSource && !isTarget) { + return; + } + final boolean isInflightReplication = + inflightActions.equals(inflightReplication); + + /* + * there are some case: + ********************************************************** + * * InflightReplication * InflightDeletion * + ********************************************************** + *source removed* unexpected * expected * + ********************************************************** + *target removed* expected * unexpected * + ********************************************************** + * unexpected action may happen somehow. to make it deterministic, + * if unexpected action happens, we just fail the completableFuture. + */ + + if (isSource && isInflightReplication) { + inflightMoveFuture.get(id).complete( + MoveResult.UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION); + inflightMove.remove(id); + inflightMoveFuture.remove(id); + return; + } + + if (isTarget && !isInflightReplication) { + inflightMoveFuture.get(id).complete( + MoveResult.UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION); + inflightMove.remove(id); + inflightMoveFuture.remove(id); + return; + } + + if (!(isInflightReplication && isCompleted)) { + if (isInflightReplication) { + if (isUnhealthy) { + inflightMoveFuture.get(id).complete( + MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + } else { + inflightMoveFuture.get(id).complete( + MoveResult.REPLICATION_FAIL_TIME_OUT); + } + } else { + if (isUnhealthy) { + inflightMoveFuture.get(id).complete( + MoveResult.DELETION_FAIL_NODE_UNHEALTHY); + } else if (isTimeout) { + inflightMoveFuture.get(id).complete( + MoveResult.DELETION_FAIL_TIME_OUT); + } else { + inflightMoveFuture.get(id).complete( + MoveResult.COMPLETED); + } + } + inflightMove.remove(id); + inflightMoveFuture.remove(id); + } else { + deleteSrcDnForMove(container, + containerManager.getContainerReplicas(id)); + } + } + + /** + * add a move action for a given container. + * + * @param cid Container to move + * @param srcDn datanode to move from + * @param targetDn datanode to move to + */ + public CompletableFuture move(ContainerID cid, + DatanodeDetails srcDn, DatanodeDetails targetDn) + throws ContainerNotFoundException, NodeNotFoundException { + CompletableFuture ret = new CompletableFuture<>(); + if (!isRunning()) { + ret.complete(MoveResult.RM_NOT_RUNNING); + return ret; + } + + /* + * make sure the flowing conditions are met: + * 1 the given two datanodes are in healthy state + * 2 the given container exists on the given source datanode + * 3 the given container does not exist on the given target datanode + * 4 the given container is in closed state + * 5 the giver container is not taking any inflight action + * 6 the given two datanodes are in IN_SERVICE state + * 7 {Existing replicas + Target_Dn - Source_Dn} satisfies + * the placement policy + * + * move is a combination of two steps : replication and deletion. + * if the conditions above are all met, then we take a conservative + * strategy here : replication can always be executed, but the execution + * of deletion always depends on placement policy + */ + + NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn); + NodeState healthStat = currentNodeStat.getHealth(); + NodeOperationalState operationalState = + currentNodeStat.getOperationalState(); + if (healthStat != NodeState.HEALTHY) { + ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + return ret; + } + if (operationalState != NodeOperationalState.IN_SERVICE) { + ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); + return ret; + } + + currentNodeStat = nodeManager.getNodeStatus(targetDn); + healthStat = currentNodeStat.getHealth(); + operationalState = currentNodeStat.getOperationalState(); + if (healthStat != NodeState.HEALTHY) { + ret.complete(MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + return ret; + } + if (operationalState != NodeOperationalState.IN_SERVICE) { + ret.complete(MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); + return ret; + } + + // we need to synchronize on ContainerInfo, since it is + // shared by ICR/FCR handler and this.processContainer + // TODO: use a Read lock after introducing a RW lock into ContainerInfo + ContainerInfo cif = containerManager.getContainer(cid); + synchronized (cif) { + final Set currentReplicas = containerManager + .getContainerReplicas(cid); + final Set replicas = currentReplicas.stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toSet()); + if (replicas.contains(targetDn)) { + ret.complete(MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET); + return ret; + } + if (!replicas.contains(srcDn)) { + ret.complete(MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE); + return ret; + } + + /* + * the reason why the given container should not be taking any inflight + * action is that: if the given container is being replicated or deleted, + * the num of its replica is not deterministic, so move operation issued + * by balancer may cause a nondeterministic result, so we should drop + * this option for this time. + * */ + + if (inflightReplication.containsKey(cid)) { + ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION); + return ret; + } + if (inflightDeletion.containsKey(cid)) { + ret.complete(MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION); + return ret; + } + + /* + * here, no need to see whether cid is in inflightMove, because + * these three map are all synchronized on ContainerInfo, if cid + * is in infligtMove , it must now being replicated or deleted, + * so it must be in inflightReplication or in infligthDeletion. + * thus, if we can not find cid in both of them , this cid must + * not be in inflightMove. + */ + + LifeCycleState currentContainerStat = cif.getState(); + if (currentContainerStat != LifeCycleState.CLOSED) { + ret.complete(MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED); + return ret; + } + + // check whether {Existing replicas + Target_Dn - Source_Dn} + // satisfies current placement policy + if (!isPolicySatisfiedAfterMove(cif, srcDn, targetDn, + currentReplicas.stream().collect(Collectors.toList()))) { + ret.complete(MoveResult.PLACEMENT_POLICY_NOT_SATISFIED); + return ret; + } + + inflightMove.putIfAbsent(cid, new ImmutablePair<>(srcDn, targetDn)); + inflightMoveFuture.putIfAbsent(cid, ret); + sendReplicateCommand(cif, targetDn, Collections.singletonList(srcDn)); + } + LOG.info("receive a move request about container {} , from {} to {}", + cid, srcDn.getUuid(), targetDn.getUuid()); + return ret; + } + + /** + * Returns whether {Existing replicas + Target_Dn - Source_Dn} + * satisfies current placement policy. + * @param cif Container Info of moved container + * @param srcDn DatanodeDetails of source data node + * @param targetDn DatanodeDetails of target data node + * @param replicas container replicas + * @return whether the placement policy is satisfied after move + */ + private boolean isPolicySatisfiedAfterMove(ContainerInfo cif, + DatanodeDetails srcDn, DatanodeDetails targetDn, + final List replicas){ + Set movedReplicas = + replicas.stream().collect(Collectors.toSet()); + movedReplicas.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); + movedReplicas.add(ContainerReplica.newBuilder() + .setDatanodeDetails(targetDn) + .setContainerID(cif.containerID()) + .setContainerState(State.CLOSED).build()); + ContainerPlacementStatus placementStatus = getPlacementStatus( + movedReplicas, cif.getReplicationConfig().getRequiredNodes()); + return placementStatus.isPolicySatisfied(); + } + /** * Returns the number replica which are pending creation for the given * container ID. @@ -864,47 +1175,129 @@ private void handleOverReplicatedContainer(final ContainerInfo container, break; } } - // After removing all unhealthy replicas, if the container is still over - // replicated then we need to check if it is already mis-replicated. - // If it is, we do no harm by removing excess replicas. However, if it is - // not mis-replicated, then we can only remove replicas if they don't - // make the container become mis-replicated. - if (excess > 0) { - eligibleReplicas.removeAll(unhealthyReplicas); - Set eligibleSet = new HashSet<>(eligibleReplicas); - ContainerPlacementStatus ps = - getPlacementStatus(eligibleSet, replicationFactor); - for (ContainerReplica r : eligibleReplicas) { - if (excess <= 0) { - break; - } - // First remove the replica we are working on from the set, and then - // check if the set is now mis-replicated. - eligibleSet.remove(r); - ContainerPlacementStatus nowPS = - getPlacementStatus(eligibleSet, replicationFactor); - if ((!ps.isPolicySatisfied() - && nowPS.actualPlacementCount() == ps.actualPlacementCount()) - || (ps.isPolicySatisfied() && nowPS.isPolicySatisfied())) { - // Remove the replica if the container was already unsatisfied - // and losing this replica keep actual placement count unchanged. - // OR if losing this replica still keep satisfied - sendDeleteCommand(container, r.getDatanodeDetails(), true); - excess -= 1; - continue; - } - // If we decided not to remove this replica, put it back into the set - eligibleSet.add(r); + eligibleReplicas.removeAll(unhealthyReplicas); + removeExcessReplicasIfNeeded(excess, container, eligibleReplicas); + } + } + + /** + * if the container is in inflightMove, handle move. + * This function assumes replication has been completed + * + * @param cif ContainerInfo + * @param replicaSet An Set of replicas, which may have excess replicas + */ + private void deleteSrcDnForMove(final ContainerInfo cif, + final Set replicaSet) { + final ContainerID cid = cif.containerID(); + if (inflightMove.containsKey(cid)) { + Pair movePair = + inflightMove.get(cid); + final DatanodeDetails srcDn = movePair.getKey(); + ContainerReplicaCount replicaCount = + getContainerReplicaCount(cif, replicaSet); + + if(!replicaSet.stream() + .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))){ + // if the target is present but source disappears somehow, + // we can consider move is successful. + inflightMoveFuture.get(cid).complete(MoveResult.COMPLETED); + inflightMove.remove(cid); + inflightMoveFuture.remove(cid); + return; + } + + int replicationFactor = + cif.getReplicationConfig().getRequiredNodes(); + ContainerPlacementStatus currentCPS = + getPlacementStatus(replicaSet, replicationFactor); + Set newReplicaSet = replicaSet. + stream().collect(Collectors.toSet()); + newReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn)); + ContainerPlacementStatus newCPS = + getPlacementStatus(newReplicaSet, replicationFactor); + + if (replicaCount.isOverReplicated() && + isPlacementStatusActuallyEqual(currentCPS, newCPS)) { + sendDeleteCommand(cif, srcDn, true); + } else { + // if source and target datanode are both in the replicaset, + // but we can not delete source datanode for now (e.g., + // there is only 3 replicas or not policy-statisfied , etc.), + // we just complete the future without sending a delete command. + LOG.info("can not remove source replica after successfully " + + "replicated to target datanode"); + inflightMoveFuture.get(cid).complete(MoveResult.DELETE_FAIL_POLICY); + inflightMove.remove(cid); + inflightMoveFuture.remove(cid); + } + } + } + + /** + * remove execess replicas if needed, replicationFactor and placement policy + * will be take into consideration. + * + * @param excess the excess number after subtracting replicationFactor + * @param container ContainerInfo + * @param eligibleReplicas An list of replicas, which may have excess replicas + */ + private void removeExcessReplicasIfNeeded(int excess, + final ContainerInfo container, + final List eligibleReplicas) { + // After removing all unhealthy replicas, if the container is still over + // replicated then we need to check if it is already mis-replicated. + // If it is, we do no harm by removing excess replicas. However, if it is + // not mis-replicated, then we can only remove replicas if they don't + // make the container become mis-replicated. + if (excess > 0) { + Set eligibleSet = new HashSet<>(eligibleReplicas); + final int replicationFactor = + container.getReplicationConfig().getRequiredNodes(); + ContainerPlacementStatus ps = + getPlacementStatus(eligibleSet, replicationFactor); + + for (ContainerReplica r : eligibleReplicas) { + if (excess <= 0) { + break; } - if (excess > 0) { - LOG.info("The container {} is over replicated with {} excess " + - "replica. The excess replicas cannot be removed without " + - "violating the placement policy", container, excess); + // First remove the replica we are working on from the set, and then + // check if the set is now mis-replicated. + eligibleSet.remove(r); + ContainerPlacementStatus nowPS = + getPlacementStatus(eligibleSet, replicationFactor); + if (isPlacementStatusActuallyEqual(ps, nowPS)) { + // Remove the replica if the container was already unsatisfied + // and losing this replica keep actual placement count unchanged. + // OR if losing this replica still keep satisfied + sendDeleteCommand(container, r.getDatanodeDetails(), true); + excess -= 1; + continue; } + // If we decided not to remove this replica, put it back into the set + eligibleSet.add(r); + } + if (excess > 0) { + LOG.info("The container {} is over replicated with {} excess " + + "replica. The excess replicas cannot be removed without " + + "violating the placement policy", container, excess); } } } + /** + * whether the given two ContainerPlacementStatus are actually equal. + * + * @param cps1 ContainerPlacementStatus + * @param cps2 ContainerPlacementStatus + */ + private boolean isPlacementStatusActuallyEqual( + ContainerPlacementStatus cps1, + ContainerPlacementStatus cps2) { + return cps1.actualPlacementCount() == cps2.actualPlacementCount() || + cps1.isPolicySatisfied() && cps2.isPolicySatisfied(); + } + /** * Given a set of ContainerReplica, transform it to a list of DatanodeDetails * and then check if the list meets the container placement policy. @@ -915,7 +1308,8 @@ private void handleOverReplicatedContainer(final ContainerInfo container, private ContainerPlacementStatus getPlacementStatus( Set replicas, int replicationFactor) { List replicaDns = replicas.stream() - .map(c -> c.getDatanodeDetails()).collect(Collectors.toList()); + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); return containerPlacement.validateContainerPlacement( replicaDns, replicationFactor); } @@ -1155,6 +1549,8 @@ public void getMetrics(MetricsCollector collector, boolean all) { inflightReplication.size()) .addGauge(ReplicationManagerMetrics.INFLIGHT_DELETION, inflightDeletion.size()) + .addGauge(ReplicationManagerMetrics.INFLIGHT_MOVE, + inflightMove.size()) .endRecord(); } @@ -1250,7 +1646,8 @@ public int getMaintenanceReplicaMinimum() { public enum ReplicationManagerMetrics implements MetricsInfo { INFLIGHT_REPLICATION("Tracked inflight container replication requests."), - INFLIGHT_DELETION("Tracked inflight container deletion requests."); + INFLIGHT_DELETION("Tracked inflight container deletion requests."), + INFLIGHT_MOVE("Tracked inflight container move requests."); private final String desc; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java index 38b06e12ce5b..d01aafc0b84f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; @@ -31,6 +32,7 @@ .ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault; +import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.ha.SCMContext; @@ -60,6 +62,8 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -73,7 +77,8 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.STALE; -import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; import static org.apache.hadoop.hdds.scm.TestUtils.getContainer; import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas; import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; @@ -92,6 +97,7 @@ public class TestReplicationManager { private ContainerManagerV2 containerManager; private OzoneConfiguration conf; private SCMNodeManager scmNodeManager; + private GenericTestUtils.LogCapturer scmLogs; private TestClock clock; @Before @@ -102,6 +108,7 @@ public void setup() HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, 0, TimeUnit.SECONDS); + scmLogs = GenericTestUtils.LogCapturer.captureLogs(ReplicationManager.LOG); containerManager = Mockito.mock(ContainerManagerV2.class); nodeManager = new SimpleMockNodeManager(); eventQueue = new EventQueue(); @@ -178,6 +185,7 @@ private void createReplicationManager(ReplicationManagerConfiguration rmConf) clock); serviceManager.notifyStatusChanged(); + scmLogs.clearOutput(); Thread.sleep(100L); } @@ -641,7 +649,7 @@ public void testHealthyClosedContainer() throws SCMException, ContainerNotFoundException, InterruptedException { final ContainerInfo container = getContainer(LifeCycleState.CLOSED); final ContainerID id = container.containerID(); - final Set replicas = getReplicas(id, CLOSED, + final Set replicas = getReplicas(id, State.CLOSED, randomDatanodeDetails(), randomDatanodeDetails(), randomDatanodeDetails()); @@ -1086,6 +1094,245 @@ public void testUnderReplicatedNotHealthySource() assertReplicaScheduled(0); } + /** + * if all the prerequisites are satisfied, move should work as expected. + */ + @Test + public void testMove() throws SCMException, NodeNotFoundException, + InterruptedException, ExecutionException { + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + ContainerID id = container.containerID(); + ContainerReplica dn1 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); + CompletableFuture cf = + replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(scmLogs.getOutput().contains( + "receive a move request about container")); + Thread.sleep(100L); + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.replicateContainerCommand, dn3)); + Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + + //replicate container to dn3 + addReplicaToDn(container, dn3, CLOSED); + replicationManager.processContainersNow(); + Thread.sleep(100L); + + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails())); + Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.deleteContainerCommand)); + containerStateManager.removeContainerReplica(id, dn1); + + replicationManager.processContainersNow(); + Thread.sleep(100L); + + Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.COMPLETED); + } + + /** + * make sure RM does not delete replica if placement policy is not satisfied. + */ + @Test + public void testMoveNotDeleteSrcIfPolicyNotSatisfied() + throws SCMException, NodeNotFoundException, + InterruptedException, ExecutionException { + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + ContainerID id = container.containerID(); + ContainerReplica dn1 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + ContainerReplica dn2 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); + CompletableFuture cf = + replicationManager.move(id, dn1.getDatanodeDetails(), dn4); + Assert.assertTrue(scmLogs.getOutput().contains( + "receive a move request about container")); + Thread.sleep(100L); + Assert.assertTrue(datanodeCommandHandler.received( + SCMCommandProto.Type.replicateContainerCommand, dn4)); + Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount( + SCMCommandProto.Type.replicateContainerCommand)); + + //replicate container to dn4 + addReplicaToDn(container, dn4, CLOSED); + //now, replication succeeds, but replica in dn2 lost, + //and there are only tree replicas totally, so rm should + //not delete the replica on dn1 + containerStateManager.removeContainerReplica(id, dn2); + replicationManager.processContainersNow(); + Thread.sleep(100L); + + Assert.assertFalse(datanodeCommandHandler.received( + SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails())); + + Assert.assertTrue(cf.isDone() && cf.get() == MoveResult.DELETE_FAIL_POLICY); + } + + + /** + * test src and target datanode become unhealthy when moving. + */ + @Test + public void testDnBecameUnhealthyWhenMoving() throws SCMException, + NodeNotFoundException, InterruptedException, ExecutionException { + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + ContainerID id = container.containerID(); + ContainerReplica dn1 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); + CompletableFuture cf = + replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(scmLogs.getOutput().contains( + "receive a move request about container")); + + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, STALE)); + replicationManager.processContainersNow(); + Thread.sleep(100L); + + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + addReplicaToDn(container, dn3, CLOSED); + replicationManager.processContainersNow(); + Thread.sleep(100L); + nodeManager.setNodeStatus(dn1.getDatanodeDetails(), + new NodeStatus(IN_SERVICE, STALE)); + replicationManager.processContainersNow(); + Thread.sleep(100L); + + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.DELETION_FAIL_NODE_UNHEALTHY); + } + + /** + * before Replication Manager generates a completablefuture for a move option, + * some Prerequisites should be satisfied. + */ + @Test + public void testMovePrerequisites() + throws SCMException, NodeNotFoundException, + InterruptedException, ExecutionException { + //all conditions is met + final ContainerInfo container = createContainer(LifeCycleState.CLOSED); + ContainerID id = container.containerID(); + ContainerReplica dn1 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + ContainerReplica dn2 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY)); + ContainerReplica dn4 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), CLOSED); + + CompletableFuture cf; + //the above move is executed successfully, so there may be some item in + //inflightReplication or inflightDeletion. here we stop replication manager + //to clear these states, which may impact the tests below. + //we don't need a running replicationManamger now + replicationManager.stop(); + Thread.sleep(100L); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.RM_NOT_RUNNING); + replicationManager.start(); + Thread.sleep(100L); + + //container in not in CLOSED state + for (LifeCycleState state : LifeCycleState.values()) { + if (state != LifeCycleState.CLOSED) { + container.setState(state); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED); + } + } + container.setState(LifeCycleState.CLOSED); + + //Node is not in healthy state + for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) { + if (state != HEALTHY) { + nodeManager.setNodeStatus(dn3, + new NodeStatus(IN_SERVICE, state)); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails()); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY); + } + } + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); + + //Node is not in IN_SERVICE state + for (HddsProtos.NodeOperationalState state : + HddsProtos.NodeOperationalState.values()) { + if (state != IN_SERVICE) { + nodeManager.setNodeStatus(dn3, + new NodeStatus(state, HEALTHY)); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); + cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails()); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE); + } + } + nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY)); + + //container exists in target datanode + cf = replicationManager.move(id, dn1.getDatanodeDetails(), + dn2.getDatanodeDetails()); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET); + + //container does not exist in source datanode + cf = replicationManager.move(id, dn3, dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE); + + replicationManager.start(); + //make container over relplicated to test the + // case that container is in inflightDeletion + ContainerReplica dn5 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED); + ContainerReplica dn6 = addReplica(container, + new NodeStatus(IN_SERVICE, HEALTHY), State.CLOSED); + replicationManager.processContainersNow(); + //waiting for inflightDeletion generation + Thread.sleep(100L); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION); + resetReplicationManager(); + + //make the replica num be 2 to test the case + //that container is in inflightReplication + containerStateManager.removeContainerReplica(id, dn6); + containerStateManager.removeContainerReplica(id, dn5); + containerStateManager.removeContainerReplica(id, dn4); + //replication manager should generate inflightReplication + replicationManager.processContainersNow(); + //waiting for inflightReplication generation + Thread.sleep(100L); + cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3); + Assert.assertTrue(cf.isDone() && cf.get() == + MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION); + } + @Test public void testReplicateCommandTimeout() throws SCMException, InterruptedException { @@ -1112,21 +1359,39 @@ private ContainerInfo createContainer(LifeCycleState containerState) return container; } - private ContainerReplica addReplica(ContainerInfo container, - NodeStatus nodeStatus, State replicaState) - throws ContainerNotFoundException { + private DatanodeDetails addNode(NodeStatus nodeStatus) { DatanodeDetails dn = randomDatanodeDetails(); dn.setPersistedOpState(nodeStatus.getOperationalState()); dn.setPersistedOpStateExpiryEpochSec( nodeStatus.getOpStateExpiryEpochSeconds()); nodeManager.register(dn, nodeStatus); + return dn; + } + + private void resetReplicationManager() throws InterruptedException { + replicationManager.stop(); + Thread.sleep(100L); + replicationManager.start(); + Thread.sleep(100L); + } + + private ContainerReplica addReplica(ContainerInfo container, + NodeStatus nodeStatus, State replicaState) + throws ContainerNotFoundException { + DatanodeDetails dn = addNode(nodeStatus); + return addReplicaToDn(container, dn, replicaState); + } + + private ContainerReplica addReplicaToDn(ContainerInfo container, + DatanodeDetails dn, State replicaState) + throws ContainerNotFoundException { // Using the same originID for all replica in the container set. If each // replica has a unique originID, it causes problems in ReplicationManager // when processing over-replicated containers. final UUID originNodeId = UUID.nameUUIDFromBytes(Longs.toByteArray(container.getContainerID())); final ContainerReplica replica = getReplicas( - container.containerID(), CLOSED, 1000L, originNodeId, dn); + container.containerID(), replicaState, 1000L, originNodeId, dn); containerStateManager .updateContainerReplica(container.containerID(), replica); return replica; @@ -1196,5 +1461,4 @@ private boolean received(final SCMCommandProto.Type type, dc.getDatanodeId().equals(datanode.getUuid())); } } - }