diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 4d36093ad113..d85d0e3633d2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -272,14 +272,19 @@ private void processContainer(ContainerID id) { StorageClass storageClass = storageClassRegistry.getStorageClass(container.getStorageClass()); - + int closedStateReplicationFactor = + storageClass.getClosedStateConfiguration().getReplicationFactor(); + int openStateReplicationFactor = + storageClass.getOpenStateConfiguration().getReplicationFactor() + .getNumber(); /* * We don't take any action if the container is in OPEN state and * the container is healthy. If the container is not healthy, i.e. * the replicas are not in OPEN state, send CLOSE_CONTAINER command. */ if (state == LifeCycleState.OPEN) { - if (!isContainerHealthy(container, replicas, storageClass)) { + if (!isContainerHealthy(container, replicas, + openStateReplicationFactor)) { eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id); } return; @@ -301,7 +306,7 @@ private void processContainer(ContainerID id) { * container if possible. */ if (state == LifeCycleState.QUASI_CLOSED && - canForceCloseContainer(container, replicas)) { + canForceCloseContainer(container, replicas, storageClass)) { forceCloseContainer(container, replicas); return; } @@ -328,7 +333,8 @@ private void processContainer(ContainerID id) { * the container is either in QUASI_CLOSED or in CLOSED state and has * exact number of replicas in the same state. */ - if (isContainerHealthy(container, replicas, storageClass)) { + if (isContainerHealthy(container, replicas, + closedStateReplicationFactor)) { return; } @@ -336,8 +342,9 @@ private void processContainer(ContainerID id) { * Check if the container is under replicated and take appropriate * action. */ - if (isContainerUnderReplicated(container, replicas, storageClass)) { - handleUnderReplicatedContainer(container, replicas); + if (isContainerUnderReplicated(container, replicas, + closedStateReplicationFactor)) { + handleUnderReplicatedContainer(container, replicas, storageClass); return; } @@ -345,8 +352,9 @@ private void processContainer(ContainerID id) { * Check if the container is over replicated and take appropriate * action. */ - if (isContainerOverReplicated(container, replicas, storageClass)) { - handleOverReplicatedContainer(container, replicas); + if (isContainerOverReplicated(container, replicas, + closedStateReplicationFactor)) { + handleOverReplicatedContainer(container, replicas, storageClass); return; } @@ -404,10 +412,10 @@ private void updateInflightAction(final ContainerInfo container, private boolean isContainerHealthy( final ContainerInfo container, final Set replicas, - final StorageClass storageClass + final int replicationFactor ) { - return !isContainerUnderReplicated(container, replicas, storageClass) && - !isContainerOverReplicated(container, replicas, storageClass) && + return !isContainerUnderReplicated(container, replicas, replicationFactor) && + !isContainerOverReplicated(container, replicas, replicationFactor) && replicas.stream().allMatch( r -> compareState(container.getState(), r.getState())); } @@ -422,13 +430,13 @@ private boolean isContainerHealthy( private boolean isContainerUnderReplicated( final ContainerInfo container, final Set replicas, - StorageClass storageClass + int replicationFactor ) { boolean misReplicated = !getPlacementStatus( replicas, - storageClass.getClosedStateConfiguration().getReplicationFactor()) + replicationFactor) .isPolicySatisfied(); - return storageClass.getClosedStateConfiguration().getReplicationFactor() > + return replicationFactor > getReplicaCount(container.containerID(), replicas) || misReplicated; } @@ -442,9 +450,9 @@ private boolean isContainerUnderReplicated( private boolean isContainerOverReplicated( final ContainerInfo container, final Set replicas, - StorageClass storageClass + final int replicationFactor ) { - return storageClass.getClosedStateConfiguration().getReplicationFactor() < + return replicationFactor < getReplicaCount(container.containerID(), replicas); } @@ -472,10 +480,12 @@ private int getReplicaCount(final ContainerID id, * @return true if we can force close the container, false otherwise */ private boolean canForceCloseContainer(final ContainerInfo container, - final Set replicas) { + final Set replicas, + final StorageClass storageClass) { Preconditions.assertTrue(container.getState() == LifeCycleState.QUASI_CLOSED); - final int replicationFactor = container.getReplicationFactor().getNumber(); + final int replicationFactor = + storageClass.getClosedStateConfiguration().getReplicationFactor(); final long uniqueQuasiClosedReplicaCount = replicas.stream() .filter(r -> r.getState() == State.QUASI_CLOSED) .map(ContainerReplica::getOriginDatanodeId) @@ -529,7 +539,8 @@ private void forceCloseContainer(final ContainerInfo container, * @param replicas Set of ContainerReplicas */ private void handleUnderReplicatedContainer(final ContainerInfo container, - final Set replicas) { + final Set replicas, + final StorageClass storageClass) { LOG.debug("Handling under-replicated container: {}", container.getContainerID()); try { @@ -553,10 +564,9 @@ private void handleUnderReplicatedContainer(final ContainerInfo container, .map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toList()); if (source.size() > 0) { - final int replicationFactor = this.storageClassRegistry - .getStorageClass(container.getStorageClass()) - .getOpenStateConfiguration() - .getReplicationFactor().getNumber(); + final int replicationFactor = storageClass + .getClosedStateConfiguration() + .getReplicationFactor(); // Want to check if the container is mis-replicated after considering // inflight add and delete. // Create a new list from source (healthy replicas minus pending delete) @@ -631,10 +641,12 @@ private void handleUnderReplicatedContainer(final ContainerInfo container, * @param replicas Set of ContainerReplicas */ private void handleOverReplicatedContainer(final ContainerInfo container, - final Set replicas) { + final Set replicas, + final StorageClass storageClass) { final ContainerID id = container.containerID(); - final int replicationFactor = container.getReplicationFactor().getNumber(); + final int replicationFactor = storageClass.getClosedStateConfiguration() + .getReplicationFactor(); // Don't consider inflight replication while calculating excess here. int excess = replicas.size() - replicationFactor - inflightDeletion.getOrDefault(id, Collections.emptyList()).size(); @@ -645,17 +657,20 @@ private void handleOverReplicatedContainer(final ContainerInfo container, " is {}, but found {}.", id, replicationFactor, replicationFactor + excess); - final Map uniqueReplicas = - new LinkedHashMap<>(); + final List eligibleReplicas = new ArrayList<>(replicas); - replicas.stream() - .filter(r -> compareState(container.getState(), r.getState())) - .forEach(r -> uniqueReplicas - .putIfAbsent(r.getOriginDatanodeId(), r)); + if (container.getState() != LifeCycleState.CLOSED) { + final Map uniqueReplicas = + new LinkedHashMap<>(); - // Retain one healthy replica per origin node Id. - final List eligibleReplicas = new ArrayList<>(replicas); - eligibleReplicas.removeAll(uniqueReplicas.values()); + replicas.stream() + .filter(r -> compareState(container.getState(), r.getState())) + .forEach(r -> uniqueReplicas + .putIfAbsent(r.getOriginDatanodeId(), r)); + + // Retain one healthy replica per origin node Id. + eligibleReplicas.removeAll(uniqueReplicas.values()); + } final List unhealthyReplicas = eligibleReplicas .stream()