diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/QuasiClosedStuckReplicaCount.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/QuasiClosedStuckReplicaCount.java new file mode 100644 index 000000000000..412978c240ef --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/QuasiClosedStuckReplicaCount.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; + +/** + * Class to count the replicas in a quasi-closed stuck container. + */ +public class QuasiClosedStuckReplicaCount { + + private final Map> replicasByOrigin = new HashMap<>(); + private final Map> inServiceReplicasByOrigin = new HashMap<>(); + private final Map> maintenanceReplicasByOrigin = new HashMap<>(); + private boolean hasOutOfServiceReplicas = false; + private int minHealthyForMaintenance; + private boolean hasHealthyReplicas = false; + + public QuasiClosedStuckReplicaCount(Set replicas, int minHealthyForMaintenance) { + this.minHealthyForMaintenance = minHealthyForMaintenance; + for (ContainerReplica r : replicas) { + if (r.getState() != StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY) { + hasHealthyReplicas = true; + } + replicasByOrigin.computeIfAbsent(r.getOriginDatanodeId(), k -> new HashSet<>()).add(r); + HddsProtos.NodeOperationalState opState = r.getDatanodeDetails().getPersistedOpState(); + if (opState == HddsProtos.NodeOperationalState.IN_SERVICE) { + inServiceReplicasByOrigin.computeIfAbsent(r.getOriginDatanodeId(), k -> new HashSet<>()).add(r); + } else if (opState == HddsProtos.NodeOperationalState.IN_MAINTENANCE + || opState == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE) { + maintenanceReplicasByOrigin.computeIfAbsent(r.getOriginDatanodeId(), k -> new HashSet<>()).add(r); + hasOutOfServiceReplicas = true; + } else { + hasOutOfServiceReplicas = true; + } + } + } + + public int availableOrigins() { + return replicasByOrigin.size(); + } + + public boolean hasOutOfServiceReplicas() { + return hasOutOfServiceReplicas; + } + + public boolean hasHealthyReplicas() { + return hasHealthyReplicas; + } + + public boolean isUnderReplicated() { + return !getUnderReplicatedReplicas().isEmpty(); + } + + public List getUnderReplicatedReplicas() { + List misReplicatedOrigins = new ArrayList<>(); + + if (replicasByOrigin.size() == 1) { + Map.Entry> entry = replicasByOrigin.entrySet().iterator().next(); + Set inService = inServiceReplicasByOrigin.get(entry.getKey()); + if (inService == null) { + inService = Collections.emptySet(); + } + Set maintenance = maintenanceReplicasByOrigin.get(entry.getKey()); + int maintenanceCount = maintenance == null ? 0 : maintenance.size(); + + if (maintenanceCount > 0) { + if (inService.size() < minHealthyForMaintenance) { + int additionalReplicas = minHealthyForMaintenance - inService.size(); + misReplicatedOrigins.add(new MisReplicatedOrigin(entry.getValue(), additionalReplicas)); + } + } else { + if (inService.size() < 3) { + int additionalReplicas = 3 - inService.size(); + misReplicatedOrigins.add(new MisReplicatedOrigin(entry.getValue(), additionalReplicas)); + } + } + return misReplicatedOrigins; + } + + // If there are multiple origins, we expect 2 copies of each origin + // For maintenance, we expect 1 copy of each origin and ignore the minHealthyForMaintenance parameter + for (Map.Entry> entry : replicasByOrigin.entrySet()) { + Set inService = inServiceReplicasByOrigin.get(entry.getKey()); + if (inService == null) { + inService = Collections.emptySet(); + } + Set maintenance = maintenanceReplicasByOrigin.get(entry.getKey()); + int maintenanceCount = maintenance == null ? 0 : maintenance.size(); + + if (inService.size() < 2) { + if (maintenanceCount > 0) { + if (inService.isEmpty()) { + // We need 1 copy online for maintenance + misReplicatedOrigins.add(new MisReplicatedOrigin(entry.getValue(), 1)); + } + } else { + misReplicatedOrigins.add(new MisReplicatedOrigin(entry.getValue(), 2 - inService.size())); + } + } + } + return misReplicatedOrigins; + } + + /** + * Returns True is the container is over-replicated. This means that if we have a single origin, there are more than + * 3 copies. If we have multiple origins, there are more than 2 copies of each origin. + * The over replication check ignore maintenance replicas. The container may become over replicated when maintenance + * ends. + * + * @return True if the container is over-replicated, otherwise false + */ + public boolean isOverReplicated() { + return !getOverReplicatedOrigins().isEmpty(); + } + + public List getOverReplicatedOrigins() { + // If there is only a single origin, we expect 3 copies, otherwise we expect 2 copies of each origin + if (replicasByOrigin.size() == 1) { + UUID origin = replicasByOrigin.keySet().iterator().next(); + Set inService = inServiceReplicasByOrigin.get(origin); + if (inService != null && inService.size() > 3) { + return Collections.singletonList(new MisReplicatedOrigin(inService, inService.size() - 3)); + } + return Collections.emptyList(); + } + + // If there are multiple origins, we expect 2 copies of each origin + List overReplicatedOrigins = new ArrayList<>(); + for (UUID origin : replicasByOrigin.keySet()) { + Set replicas = inServiceReplicasByOrigin.get(origin); + if (replicas != null && replicas.size() > 2) { + overReplicatedOrigins.add(new MisReplicatedOrigin(replicas, replicas.size() - 2)); + } + } + // If we have 2 copies or less of each origin, we are not over-replicated + return overReplicatedOrigins; + } + + /** + * Class to represent the origin of under replicated replicas and the number of additional replicas required. + */ + public static class MisReplicatedOrigin { + + private final Set sources; + private final int replicaDelta; + + public MisReplicatedOrigin(Set sources, int replicaDelta) { + this.sources = sources; + this.replicaDelta = replicaDelta; + } + + public Set getSources() { + return sources; + } + + public int getReplicaDelta() { + return replicaDelta; + } + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/QuasiClosedStuckUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/QuasiClosedStuckUnderReplicationHandler.java new file mode 100644 index 000000000000..fd442eb1976f --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/QuasiClosedStuckUnderReplicationHandler.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to correct under replicated QuasiClosed Stuck Ratis containers. + */ +public class QuasiClosedStuckUnderReplicationHandler implements UnhealthyReplicationHandler { + public static final Logger LOG = LoggerFactory.getLogger(QuasiClosedStuckUnderReplicationHandler.class); + + private final PlacementPolicy placementPolicy; + private final ReplicationManager replicationManager; + private final long currentContainerSize; + private final ReplicationManagerMetrics metrics; + + public QuasiClosedStuckUnderReplicationHandler(final PlacementPolicy placementPolicy, + final ConfigurationSource conf, final ReplicationManager replicationManager) { + this.placementPolicy = placementPolicy; + this.currentContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + this.replicationManager = replicationManager; + this.metrics = replicationManager.getMetrics(); + } + + @Override + public int processAndSendCommands(Set replicas, List pendingOps, + ContainerHealthResult result, int remainingMaintenanceRedundancy) throws IOException { + ContainerInfo containerInfo = result.getContainerInfo(); + LOG.debug("Handling under replicated QuasiClosed Stuck Ratis container {}", containerInfo); + + int pendingAdd = 0; + for (ContainerReplicaOp op : pendingOps) { + if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) { + pendingAdd++; + } + } + + if (pendingAdd > 0) { + LOG.debug("Container {} has pending add operations. No more replication will be scheduled until they complete", + containerInfo); + return 0; + } + + QuasiClosedStuckReplicaCount replicaCount = + new QuasiClosedStuckReplicaCount(replicas, remainingMaintenanceRedundancy); + + List misReplicatedOrigins + = replicaCount.getUnderReplicatedReplicas(); + + if (misReplicatedOrigins.isEmpty()) { + LOG.debug("Container {} is not under replicated", containerInfo); + return 0; + } + + // Schedule Replicas for the under replicated origins. + int totalRequiredReplicas = 0; + int totalCommandsSent = 0; + IOException firstException = null; + List mutablePendingOps = new ArrayList<>(pendingOps); + for (QuasiClosedStuckReplicaCount.MisReplicatedOrigin origin : misReplicatedOrigins) { + totalRequiredReplicas += origin.getReplicaDelta(); + List targets; + try { + targets = getTargets(containerInfo, replicas, origin.getReplicaDelta(), mutablePendingOps); + } catch (SCMException e) { + if (firstException == null) { + firstException = e; + } + LOG.warn("Cannot replicate container {} because no suitable targets were found.", containerInfo, e); + continue; + } + + List sourceDatanodes = origin.getSources().stream() + .map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + for (DatanodeDetails target : targets) { + try { + replicationManager.sendThrottledReplicationCommand(containerInfo, sourceDatanodes, target, 0); + // Add the pending op, so we exclude the node for subsequent origins + mutablePendingOps.add(ContainerReplicaOp.create(ContainerReplicaOp.PendingOpType.ADD, target, 0)); + totalCommandsSent++; + } catch (CommandTargetOverloadedException e) { + LOG.warn("Cannot replicate container {} because all sources are overloaded.", containerInfo); + if (firstException == null) { + firstException = e; + } + } + } + } + + if (firstException != null || totalCommandsSent < totalRequiredReplicas) { + // Some commands were not sent as expected (not enough nodes found or overloaded nodes), so we just rethrow + // the first exception we encountered. + LOG.info("A command was not sent for all required new replicas for container {}. Total sent {}, required {} ", + containerInfo, totalCommandsSent, totalRequiredReplicas); + metrics.incrPartialReplicationTotal(); + if (firstException != null) { + throw firstException; + } else { + throw new InsufficientDatanodesException(totalRequiredReplicas, totalCommandsSent); + } + } + return totalCommandsSent; + } + + private List getTargets(ContainerInfo containerInfo, + Set replicas, int additionalRequired, List pendingOps) throws IOException { + LOG.debug("Need {} target datanodes for container {}. Current replicas: {}.", + additionalRequired, containerInfo, replicas); + + ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes = + ReplicationManagerUtil.getExcludedAndUsedNodes(containerInfo, new ArrayList<>(replicas), Collections.emptySet(), + pendingOps, replicationManager); + + List excluded = excludedAndUsedNodes.getExcludedNodes(); + List used = excludedAndUsedNodes.getUsedNodes(); + + LOG.debug("UsedList: {}, size {}. ExcludeList: {}, size: {}. ", + used, used.size(), excluded, excluded.size()); + + return ReplicationManagerUtil.getTargetDatanodes(placementPolicy, + additionalRequired, used, excluded, currentContainerSize, containerInfo); + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 0dee75f559e5..edaaf7ef2627 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hdds.scm.container.replication.health.MismatchedReplicasHandler; import org.apache.hadoop.hdds.scm.container.replication.health.OpenContainerHandler; import org.apache.hadoop.hdds.scm.container.replication.health.QuasiClosedContainerHandler; +import org.apache.hadoop.hdds.scm.container.replication.health.QuasiClosedStuckReplicationCheck; import org.apache.hadoop.hdds.scm.container.replication.health.RatisReplicationCheckHandler; import org.apache.hadoop.hdds.scm.container.replication.health.RatisUnhealthyReplicationCheckHandler; import org.apache.hadoop.hdds.scm.container.replication.health.VulnerableUnhealthyReplicasHandler; @@ -182,6 +183,7 @@ public class ReplicationManager implements SCMService, ContainerReplicaPendingOp private final RatisUnderReplicationHandler ratisUnderReplicationHandler; private final RatisOverReplicationHandler ratisOverReplicationHandler; private final RatisMisReplicationHandler ratisMisReplicationHandler; + private final QuasiClosedStuckUnderReplicationHandler quasiClosedStuckUnderReplicationHandler; private Thread underReplicatedProcessorThread; private Thread overReplicatedProcessorThread; private final UnderReplicatedProcessor underReplicatedProcessor; @@ -248,6 +250,8 @@ public ReplicationManager(final ConfigurationSource conf, new RatisOverReplicationHandler(ratisContainerPlacement, this); ratisMisReplicationHandler = new RatisMisReplicationHandler( ratisContainerPlacement, conf, this); + quasiClosedStuckUnderReplicationHandler = + new QuasiClosedStuckUnderReplicationHandler(ratisContainerPlacement, conf, this); underReplicatedProcessor = new UnderReplicatedProcessor(this, rmConf::getUnderReplicatedInterval); overReplicatedProcessor = @@ -262,6 +266,7 @@ public ReplicationManager(final ConfigurationSource conf, .addNext(new MismatchedReplicasHandler(this)) .addNext(new EmptyContainerHandler(this)) .addNext(new DeletingContainerHandler(this)) + .addNext(new QuasiClosedStuckReplicationCheck()) .addNext(ecReplicationCheckHandler) .addNext(ratisReplicationCheckHandler) .addNext(new ClosedWithUnhealthyReplicasHandler(this)) @@ -746,8 +751,15 @@ int processUnderReplicatedContainer( if (result.getHealthState() == ContainerHealthResult.HealthState.UNDER_REPLICATED) { - handler = isEC ? ecUnderReplicationHandler - : ratisUnderReplicationHandler; + if (isEC) { + handler = ecUnderReplicationHandler; + } else { + if (QuasiClosedStuckReplicationCheck.shouldHandleAsQuasiClosedStuck(result.getContainerInfo(), replicas)) { + handler = quasiClosedStuckUnderReplicationHandler; + } else { + handler = ratisUnderReplicationHandler; + } + } } else if (result.getHealthState() == ContainerHealthResult.HealthState.MIS_REPLICATED) { handler = isEC ? ecMisReplicationHandler : ratisMisReplicationHandler; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/QuasiClosedContainerHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/QuasiClosedContainerHandler.java index 11b45755a62c..bfac61f404f9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/QuasiClosedContainerHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/QuasiClosedContainerHandler.java @@ -85,6 +85,17 @@ public boolean handle(ContainerCheckRequest request) { return false; } + /** + * Returns true if the container is stuck in QUASI_CLOSED state, otherwise false. + * @param container The container to check + * @param replicas Set of ContainerReplicas + * @return true if the container is stuck in QUASI_CLOSED state, otherwise false + */ + public static boolean isQuasiClosedStuck(final ContainerInfo container, + final Set replicas) { + return !canForceCloseContainer(container, replicas); + } + /** * Returns true if more than 50% of the container replicas with unique * originNodeId are in QUASI_CLOSED state. @@ -93,7 +104,7 @@ public boolean handle(ContainerCheckRequest request) { * @param replicas Set of ContainerReplicas * @return true if we can force close the container, false otherwise */ - private boolean canForceCloseContainer(final ContainerInfo container, + private static boolean canForceCloseContainer(final ContainerInfo container, final Set replicas) { final int replicationFactor = container.getReplicationConfig().getRequiredNodes(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/QuasiClosedStuckReplicationCheck.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/QuasiClosedStuckReplicationCheck.java new file mode 100644 index 000000000000..7882dfd32eb6 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/QuasiClosedStuckReplicationCheck.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication.health; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED; + +import java.util.Set; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport; +import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest; +import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult; +import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp; +import org.apache.hadoop.hdds.scm.container.replication.QuasiClosedStuckReplicaCount; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to check for the replication of the replicas in quasi-closed stuck containers. As we want to maintain + * as much data and information as possible, the rule for QC stuck container is to maintain 2 copies of each origin + * if there is more than 1 origin. If there is only 1 origin, then we need to maintain 3 copies. + */ +public class QuasiClosedStuckReplicationCheck extends AbstractCheck { + public static final Logger LOG = LoggerFactory.getLogger(QuasiClosedStuckReplicationCheck.class); + + public static boolean shouldHandleAsQuasiClosedStuck(ContainerInfo containerInfo, Set replicas) { + if (containerInfo.getState() != QUASI_CLOSED) { + return false; + } + if (!QuasiClosedContainerHandler.isQuasiClosedStuck(containerInfo, replicas)) { + return false; + } + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 0); + if (replicaCount.availableOrigins() == 1) { + // This is the 3 copies of a single origin case, so allow it to be handled via the normal under-replicated + // handler. + return false; + } + // If we have all origins with open replicas, and not unhealthy then the container should close after the close + // goes through, so this handler should not run. + return !hasEnoughOriginsWithOpen(containerInfo, replicas); + } + + @Override + public boolean handle(ContainerCheckRequest request) { + if (!shouldHandleAsQuasiClosedStuck(request.getContainerInfo(), request.getContainerReplicas())) { + return false; + } + + if (request.getContainerReplicas().isEmpty()) { + // If there are no replicas, then mark as missing and return. + request.getReport().incrementAndSample( + ReplicationManagerReport.HealthState.MISSING, request.getContainerInfo().containerID()); + return true; + } + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount( + request.getContainerReplicas(), request.getMaintenanceRedundancy()); + + if (!replicaCount.hasHealthyReplicas()) { + // All unhealthy are handled by a different handler + return false; + } + + int pendingAdd = 0; + int pendingDelete = 0; + for (ContainerReplicaOp op : request.getPendingOps()) { + if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) { + pendingAdd++; + } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) { + pendingDelete++; + } + } + + if (replicaCount.isUnderReplicated()) { + LOG.debug("Container {} is quasi-closed-stuck under-replicated", request.getContainerInfo()); + request.getReport().incrementAndSample(ReplicationManagerReport.HealthState.UNDER_REPLICATED, + request.getContainerInfo().containerID()); + if (pendingAdd == 0) { + // Only queue if there are no pending adds, as that could correct the under replication. + LOG.debug("Queueing under-replicated health result for container {}", request.getContainerInfo()); + ContainerHealthResult.UnderReplicatedHealthResult underReplicatedHealthResult = + new ContainerHealthResult.UnderReplicatedHealthResult(request.getContainerInfo(), 1, + replicaCount.hasOutOfServiceReplicas(), false, false); + request.getReplicationQueue().enqueue(underReplicatedHealthResult); + } + return true; + } + + if (replicaCount.isOverReplicated()) { + LOG.debug("Container {} is quasi-closed-stuck over-replicated", request.getContainerInfo()); + request.getReport().incrementAndSample(ReplicationManagerReport.HealthState.OVER_REPLICATED, + request.getContainerInfo().containerID()); + if (pendingDelete == 0) { + // Only queue if there are no pending deletes which could correct the over replication + LOG.debug("Queueing over-replicated health result for container {}", request.getContainerInfo()); + ContainerHealthResult.OverReplicatedHealthResult overReplicatedHealthResult = + new ContainerHealthResult.OverReplicatedHealthResult(request.getContainerInfo(), 1, false); + request.getReplicationQueue().enqueue(overReplicatedHealthResult); + } + return true; + } + return false; + } + + private static boolean hasEnoughOriginsWithOpen(ContainerInfo containerInfo, Set replicas) { + final long uniqueOpenReplicaCount = replicas.stream() + .filter(r -> r.getState() == State.QUASI_CLOSED || r.getState() == State.OPEN) + .map(ContainerReplica::getOriginDatanodeId) + .distinct() + .count(); + final int replicationFactor = containerInfo.getReplicationConfig().getRequiredNodes(); + return uniqueOpenReplicaCount >= replicationFactor; + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/RatisReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/RatisReplicationCheckHandler.java index 3ed7348112a6..4ddd99aeb3e6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/RatisReplicationCheckHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/RatisReplicationCheckHandler.java @@ -79,6 +79,10 @@ public boolean handle(ContainerCheckRequest request) { // This handler is only for Ratis containers. return false; } + if (QuasiClosedStuckReplicationCheck + .shouldHandleAsQuasiClosedStuck(request.getContainerInfo(), request.getContainerReplicas())) { + return false; + } ReplicationManagerReport report = request.getReport(); ContainerInfo container = request.getContainerInfo(); ContainerHealthResult health = checkHealth(request); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java index 0d66dbd4e415..a597edd3b7f2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java @@ -125,6 +125,18 @@ public static Set createReplicasWithSameOrigin( return replicas; } + public static Set createReplicasWithOriginAndOpState( + ContainerID containerID, ContainerReplicaProto.State replicaState, + Pair... nodes) { + Set replicas = new HashSet<>(); + for (Pair i : nodes) { + replicas.add(createContainerReplica( + containerID, 0, i.getRight(), replicaState, 123L, 1234L, + MockDatanodeDetails.randomDatanodeDetails(), i.getLeft())); + } + return replicas; + } + public static ContainerReplica createContainerReplica(ContainerID containerID, int replicaIndex, HddsProtos.NodeOperationalState opState, ContainerReplicaProto.State replicaState) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckReplicaCount.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckReplicaCount.java new file mode 100644 index 000000000000..2e19e788509e --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckReplicaCount.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED; +import static org.apache.ratis.util.Preconditions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Tests for the QuasiClosedStuckReplicaCount class. + */ +public class TestQuasiClosedStuckReplicaCount { + + private UUID origin1; + private UUID origin2; + private UUID origin3; + + @BeforeEach + public void setUp() { + origin1 = UUID.randomUUID(); + origin2 = UUID.randomUUID(); + origin3 = UUID.randomUUID(); + } + + @Test + public void testCorrectlyReplicationWithThreeOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE), + Pair.of(origin3, IN_SERVICE), Pair.of(origin3, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + assertTrue(replicaCount.getUnderReplicatedReplicas().isEmpty()); + } + + @Test + public void testCorrectReplicationWithTwoOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + assertTrue(replicaCount.getUnderReplicatedReplicas().isEmpty()); + } + + @Test + public void testCorrectReplicationWithOneOrigin() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + assertTrue(replicaCount.getUnderReplicatedReplicas().isEmpty()); + } + + @Test + public void testUnderReplicationWithThreeOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE), + Pair.of(origin3, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertTrue(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getUnderReplicatedReplicas(), 1, 1, 1, origin3); + } + + @Test + public void testUnderReplicationWithThreeOriginsTwoUnderReplicated() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE), + Pair.of(origin3, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertTrue(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + + List misReplicatedOrigins = + replicaCount.getUnderReplicatedReplicas(); + assertTrue(misReplicatedOrigins.size() == 2); + + for (QuasiClosedStuckReplicaCount.MisReplicatedOrigin misReplicatedOrigin : misReplicatedOrigins) { + UUID source = misReplicatedOrigin.getSources().iterator().next().getOriginDatanodeId(); + assertTrue(source.equals(origin1) || source.equals(origin3)); + } + } + + @Test + public void testUnderReplicationWithTwoOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin2, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertTrue(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getUnderReplicatedReplicas(), 1, 1, 1, origin2); + } + + @Test + public void testUnderReplicationWithOneOrigin() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertTrue(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getUnderReplicatedReplicas(), 1, 1, 2, origin1); + } + + @Test + public void testOverReplicationWithThreeOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE), + Pair.of(origin3, IN_SERVICE), Pair.of(origin3, IN_SERVICE), Pair.of(origin3, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertTrue(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getOverReplicatedOrigins(), 1, 3, 1, origin3); + } + + @Test + public void testOverReplicationWithTwoOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertTrue(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getOverReplicatedOrigins(), 1, 3, 1, origin2); + } + + @Test + public void testOverReplicationWithOneOrigin() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin1, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertTrue(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getOverReplicatedOrigins(), 1, 4, 1, origin1); + } + + @Test + public void testUnderReplicationDueToDecommissionWithThreeOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, DECOMMISSIONING), Pair.of(origin1, DECOMMISSIONING), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE), + Pair.of(origin3, IN_SERVICE), Pair.of(origin3, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertTrue(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getUnderReplicatedReplicas(), 1, 2, 2, origin1); + } + + @Test + public void testUnderReplicationDueToDecommissionWithTwoOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, DECOMMISSIONING), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertTrue(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getUnderReplicatedReplicas(), 1, 2, 1, origin1); + } + + @Test + public void testUnderReplicationDueToDecommissionWithOneOrigin() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, DECOMMISSIONING), Pair.of(origin1, DECOMMISSIONING)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertTrue(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getUnderReplicatedReplicas(), 1, 3, 2, origin1); + } + + @Test + public void testNoOverReplicationWithOutOfServiceReplicasWithThreeOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), Pair.of(origin1, DECOMMISSIONED), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE), + Pair.of(origin3, IN_SERVICE), Pair.of(origin3, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + } + + @Test + public void testNoOverReplicationWithOutOfServiceReplicasWithTwoOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), Pair.of(origin1, DECOMMISSIONED), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + } + + @Test + public void testNoOverReplicationWithOutOfServiceReplicasWithOneOrigin() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin1, DECOMMISSIONED)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + } + + @Test + public void testUnderReplicationWithMaintenanceWithOneOrigin() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), Pair.of(origin1, ENTERING_MAINTENANCE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + + replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, ENTERING_MAINTENANCE), Pair.of(origin1, ENTERING_MAINTENANCE)); + + replicaCount = new QuasiClosedStuckReplicaCount(replicas, 2); + assertTrue(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getUnderReplicatedReplicas(), 1, 3, 1, origin1); + } + + @Test + public void testUnderReplicationWithMaintenanceWithTwoOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, ENTERING_MAINTENANCE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + + replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, ENTERING_MAINTENANCE), Pair.of(origin1, ENTERING_MAINTENANCE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + + replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertTrue(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + validateMisReplicatedOrigins(replicaCount.getUnderReplicatedReplicas(), 1, 2, 1, origin1); + } + + @Test + public void testNoOverReplicationWithExcessMaintenanceReplicasTwoOrigins() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_MAINTENANCE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + } + + @Test + public void testNoOverReplicationWithExcessMaintenanceReplicasOneOrigin() { + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState( + ContainerID.valueOf(1), QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin1, IN_MAINTENANCE)); + + QuasiClosedStuckReplicaCount replicaCount = new QuasiClosedStuckReplicaCount(replicas, 1); + assertFalse(replicaCount.isUnderReplicated()); + assertFalse(replicaCount.isOverReplicated()); + } + + private void validateMisReplicatedOrigins( + List misReplicatedOrigins, + int expectedUnderRepOrigins, int expectedSources, int expectedDelta, UUID expectedOrigin) { + + assertTrue(misReplicatedOrigins.size() == expectedUnderRepOrigins); + Set sources = misReplicatedOrigins.get(0).getSources(); + assertEquals(sources.size(), expectedSources); + for (ContainerReplica source : sources) { + assertTrue(source.getOriginDatanodeId().equals(expectedOrigin)); + } + assertTrue(misReplicatedOrigins.get(0).getReplicaDelta() == expectedDelta); + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java new file mode 100644 index 000000000000..2c46aea502cf --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestQuasiClosedStuckUnderReplicationHandler.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.ratis.protocol.exceptions.NotLeaderException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Test for QuasiClosedStuckUnderReplicationHandler. + */ +public class TestQuasiClosedStuckUnderReplicationHandler { + + private static final RatisReplicationConfig RATIS_REPLICATION_CONFIG = RatisReplicationConfig.getInstance(THREE); + private ContainerInfo container; + private NodeManager nodeManager; + private OzoneConfiguration conf; + private ReplicationManager replicationManager; + private ReplicationManagerMetrics metrics; + private PlacementPolicy policy; + private Set>> commandsSent; + private QuasiClosedStuckUnderReplicationHandler handler; + + + @BeforeEach + void setup(@TempDir File testDir) throws NodeNotFoundException, + CommandTargetOverloadedException, NotLeaderException { + container = ReplicationTestUtil.createContainer( + HddsProtos.LifeCycleState.QUASI_CLOSED, RATIS_REPLICATION_CONFIG); + + nodeManager = mock(NodeManager.class); + conf = SCMTestUtils.getConf(testDir); + policy = ReplicationTestUtil + .getSimpleTestPlacementPolicy(nodeManager, conf); + replicationManager = mock(ReplicationManager.class); + OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); + ozoneConfiguration.setBoolean("hdds.scm.replication.push", true); + when(replicationManager.getConfig()) + .thenReturn(ozoneConfiguration.getObject( + ReplicationManager.ReplicationManagerConfiguration.class)); + metrics = ReplicationManagerMetrics.create(replicationManager); + when(replicationManager.getMetrics()).thenReturn(metrics); + + /* + Return NodeStatus with NodeOperationalState as specified in + DatanodeDetails, and NodeState as HEALTHY. + */ + when( + replicationManager.getNodeStatus(any(DatanodeDetails.class))) + .thenAnswer(invocationOnMock -> { + DatanodeDetails dn = invocationOnMock.getArgument(0); + return new NodeStatus(dn.getPersistedOpState(), + HddsProtos.NodeState.HEALTHY); + }); + + commandsSent = new HashSet<>(); + ReplicationTestUtil.mockRMSendThrottleReplicateCommand( + replicationManager, commandsSent, new AtomicBoolean(false)); + ReplicationTestUtil.mockRMSendDatanodeCommand(replicationManager, + commandsSent); + ReplicationTestUtil.mockRMSendDeleteCommand(replicationManager, + commandsSent); + handler = new QuasiClosedStuckUnderReplicationHandler(policy, conf, + replicationManager); + } + + @Test + public void testReturnsZeroIfNotUnderReplicated() throws IOException { + UUID origin = UUID.randomUUID(); + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(), + StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED, + Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE), + Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE), + Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE)); + + int count = handler.processAndSendCommands(replicas, Collections.emptyList(), getUnderReplicatedHealthResult(), 1); + assertEquals(0, count); + } + + @Test + public void testNoCommandsScheduledIfPendingOps() throws IOException { + UUID origin = UUID.randomUUID(); + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(), + StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED, + Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE), + Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE)); + List pendingOps = new ArrayList<>(); + pendingOps.add(ContainerReplicaOp.create( + ContainerReplicaOp.PendingOpType.ADD, MockDatanodeDetails.randomDatanodeDetails(), 0)); + + int count = handler.processAndSendCommands(replicas, pendingOps, getUnderReplicatedHealthResult(), 1); + assertEquals(0, count); + } + + @Test + public void testCommandScheduledForUnderReplicatedContainer() throws IOException { + UUID origin = UUID.randomUUID(); + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(), + StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED, + Pair.of(origin, HddsProtos.NodeOperationalState.IN_SERVICE)); + + int count = handler.processAndSendCommands(replicas, Collections.emptyList(), getUnderReplicatedHealthResult(), 1); + assertEquals(2, count); + ReplicationTestUtil.mockRMSendThrottleReplicateCommand(replicationManager, commandsSent, new AtomicBoolean(true)); + } + + @Test + public void testOverloadedExceptionContinuesAndThrows() throws NotLeaderException, CommandTargetOverloadedException { + UUID origin1 = UUID.randomUUID(); + UUID origin2 = UUID.randomUUID(); + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(), + StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED, + Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE), + Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE)); + + ReplicationTestUtil.mockRMSendThrottleReplicateCommand(replicationManager, commandsSent, new AtomicBoolean(true)); + + assertThrows(CommandTargetOverloadedException.class, () -> + handler.processAndSendCommands(replicas, Collections.emptyList(), getUnderReplicatedHealthResult(), 1)); + assertEquals(1, commandsSent.size()); + } + + @Test + public void testInsufficientNodesExceptionThrown() { + UUID origin1 = UUID.randomUUID(); + UUID origin2 = UUID.randomUUID(); + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(), + StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED, + Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE), + Pair.of(origin2, HddsProtos.NodeOperationalState.IN_SERVICE)); + + policy = ReplicationTestUtil.getNoNodesTestPlacementPolicy(nodeManager, conf); + handler = new QuasiClosedStuckUnderReplicationHandler(policy, conf, replicationManager); + + assertThrows(SCMException.class, () -> + handler.processAndSendCommands(replicas, Collections.emptyList(), getUnderReplicatedHealthResult(), 1)); + assertEquals(0, commandsSent.size()); + } + + @Test + public void testPartialReplicationExceptionThrown() { + UUID origin1 = UUID.randomUUID(); + Set replicas = ReplicationTestUtil.createReplicasWithOriginAndOpState(container.containerID(), + StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED, + Pair.of(origin1, HddsProtos.NodeOperationalState.IN_SERVICE)); + + policy = ReplicationTestUtil.getInsufficientNodesTestPlacementPolicy(nodeManager, conf, 2); + handler = new QuasiClosedStuckUnderReplicationHandler(policy, conf, replicationManager); + + assertThrows(InsufficientDatanodesException.class, () -> + handler.processAndSendCommands(replicas, Collections.emptyList(), getUnderReplicatedHealthResult(), 1)); + assertEquals(1, commandsSent.size()); + } + + private ContainerHealthResult.UnderReplicatedHealthResult getUnderReplicatedHealthResult() { + ContainerHealthResult.UnderReplicatedHealthResult + healthResult = mock(ContainerHealthResult.UnderReplicatedHealthResult.class); + when(healthResult.getContainerInfo()).thenReturn(container); + return healthResult; + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index e0a4130021d1..3bc1825290e3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -39,7 +39,6 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyList; -import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; @@ -49,7 +48,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableList; import com.google.protobuf.Proto2Utils; import java.io.IOException; import java.time.Instant; @@ -462,104 +460,6 @@ public void testQuasiClosedContainerWithUnhealthyReplicaOnUniqueOrigin() replicas.add(unhealthy); storeContainerAndReplicas(container, replicas); - replicationManager.processContainer(container, repQueue, repReport); - assertEquals(0, repReport.getStat( - ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - assertEquals(0, repReport.getStat( - ReplicationManagerReport.HealthState.OVER_REPLICATED)); - assertEquals(0, repQueue.underReplicatedQueueSize()); - assertEquals(0, repQueue.overReplicatedQueueSize()); - } - - @Test - public void testQuasiClosedContainerWithVulnerableUnhealthyReplica() - throws IOException, NodeNotFoundException { - RatisReplicationConfig ratisRepConfig = - RatisReplicationConfig.getInstance(THREE); - long sequenceID = 10; - ContainerInfo container = createContainerInfo(ratisRepConfig, 1, - HddsProtos.LifeCycleState.QUASI_CLOSED, sequenceID); - - // this method creates replicas with same origin id and zero sequence id - Set replicas = - createReplicasWithSameOrigin(container.containerID(), - ContainerReplicaProto.State.QUASI_CLOSED, 0, 0, 0); - replicas.add(createContainerReplica(container.containerID(), 0, - IN_SERVICE, ContainerReplicaProto.State.UNHEALTHY, sequenceID)); - ContainerReplica decommissioning = - createContainerReplica(container.containerID(), 0, DECOMMISSIONING, - ContainerReplicaProto.State.UNHEALTHY, sequenceID); - replicas.add(decommissioning); - storeContainerAndReplicas(container, replicas); - when(replicationManager.getNodeStatus(any(DatanodeDetails.class))) - .thenAnswer(invocation -> { - DatanodeDetails dn = invocation.getArgument(0); - if (dn.equals(decommissioning.getDatanodeDetails())) { - return new NodeStatus(DECOMMISSIONING, HddsProtos.NodeState.HEALTHY); - } - - return NodeStatus.inServiceHealthy(); - }); - - replicationManager.processContainer(container, repQueue, repReport); - assertEquals(1, repReport.getStat( - ReplicationManagerReport.HealthState.UNDER_REPLICATED)); - assertEquals(0, repReport.getStat( - ReplicationManagerReport.HealthState.OVER_REPLICATED)); - assertEquals(1, repQueue.underReplicatedQueueSize()); - assertEquals(0, repQueue.overReplicatedQueueSize()); - - when(ratisPlacementPolicy.chooseDatanodes(anyList(), anyList(), eq(null), eq(1), anyLong(), - anyLong())).thenAnswer(invocation -> ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails())); - when(nodeManager.getTotalDatanodeCommandCounts(any(DatanodeDetails.class), any(), any())) - .thenAnswer(invocation -> { - Map map = new HashMap<>(); - map.put(SCMCommandProto.Type.replicateContainerCommand, 0); - map.put(SCMCommandProto.Type.reconstructECContainersCommand, 0); - return map; - }); - RatisUnderReplicationHandler handler = - new RatisUnderReplicationHandler(ratisPlacementPolicy, configuration, replicationManager); - - handler.processAndSendCommands(replicas, Collections.emptyList(), repQueue.dequeueUnderReplicatedContainer(), 2); - assertEquals(1, commandsSent.size()); - Pair> command = commandsSent.iterator().next(); - assertEquals(SCMCommandProto.Type.replicateContainerCommand, command.getValue().getType()); - assertEquals(decommissioning.getDatanodeDetails().getUuid(), command.getKey()); - } - - - /** - * There is a QUASI_CLOSED container with some UNHEALTHY replicas on unique origin nodes. If the datanode hosting - * one such replica is being taken offline, then the UNHEALTHY replica needs to be replicated to another node. - */ - @Test - public void testQuasiClosedContainerWithUnhealthyReplicaOnDecommissioningNodeWithUniqueOrigin() - throws IOException, NodeNotFoundException { - RatisReplicationConfig ratisRepConfig = - RatisReplicationConfig.getInstance(THREE); - // create a QUASI_CLOSED container with 3 QUASI_CLOSED replicas on same origin, and 1 UNHEALTHY on unique origin - ContainerInfo container = createContainerInfo(ratisRepConfig, 1, - HddsProtos.LifeCycleState.QUASI_CLOSED); - Set replicas = - createReplicasWithSameOrigin(container.containerID(), - ContainerReplicaProto.State.QUASI_CLOSED, 0, 0, 0); - ContainerReplica unhealthy = - createContainerReplica(container.containerID(), 0, DECOMMISSIONING, - ContainerReplicaProto.State.UNHEALTHY); - replicas.add(unhealthy); - storeContainerAndReplicas(container, replicas); - when(replicationManager.getNodeStatus(any(DatanodeDetails.class))) - .thenAnswer(invocation -> { - DatanodeDetails dn = invocation.getArgument(0); - if (dn.equals(unhealthy.getDatanodeDetails())) { - return new NodeStatus(DECOMMISSIONING, HddsProtos.NodeState.HEALTHY); - } - - return NodeStatus.inServiceHealthy(); - }); - - // the container should be under replicated and queued to under replication queue replicationManager.processContainer(container, repQueue, repReport); assertEquals(1, repReport.getStat( ReplicationManagerReport.HealthState.UNDER_REPLICATED)); @@ -567,26 +467,6 @@ public void testQuasiClosedContainerWithUnhealthyReplicaOnDecommissioningNodeWit ReplicationManagerReport.HealthState.OVER_REPLICATED)); assertEquals(1, repQueue.underReplicatedQueueSize()); assertEquals(0, repQueue.overReplicatedQueueSize()); - - // next, this test sets up some mocks to test if RatisUnderReplicationHandler will handle this container correctly - when(ratisPlacementPolicy.chooseDatanodes(anyList(), anyList(), eq(null), eq(1), anyLong(), - anyLong())).thenAnswer(invocation -> ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails())); - when(nodeManager.getTotalDatanodeCommandCounts(any(DatanodeDetails.class), any(), any())) - .thenAnswer(invocation -> { - Map map = new HashMap<>(); - map.put(SCMCommandProto.Type.replicateContainerCommand, 0); - map.put(SCMCommandProto.Type.reconstructECContainersCommand, 0); - return map; - }); - RatisUnderReplicationHandler handler = - new RatisUnderReplicationHandler(ratisPlacementPolicy, configuration, replicationManager); - - handler.processAndSendCommands(replicas, Collections.emptyList(), repQueue.dequeueUnderReplicatedContainer(), 2); - assertEquals(1, commandsSent.size()); - Pair> command = commandsSent.iterator().next(); - // a replicate command should have been sent for the UNHEALTHY replica - assertEquals(SCMCommandProto.Type.replicateContainerCommand, command.getValue().getType()); - assertEquals(unhealthy.getDatanodeDetails().getUuid(), command.getKey()); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestQuasiClosedStuckReplicationCheck.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestQuasiClosedStuckReplicationCheck.java new file mode 100644 index 000000000000..6a6dd84243d4 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestQuasiClosedStuckReplicationCheck.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication.health; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.QUASI_CLOSED; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport; +import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest; +import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + + +/** + * Tests for the QuasiClosedStuckReplicationCheck class. + */ +public class TestQuasiClosedStuckReplicationCheck { + + private QuasiClosedStuckReplicationCheck handler; + private final UUID origin1 = UUID.randomUUID(); + private final UUID origin2 = UUID.randomUUID(); + private final UUID origin3 = UUID.randomUUID(); + private ReplicationManagerReport report; + private ReplicationQueue queue; + + @BeforeEach + public void setup() { + handler = new QuasiClosedStuckReplicationCheck(); + report = new ReplicationManagerReport(); + queue = new ReplicationQueue(); + } + + @Test + public void testClosedContainerReturnsFalse() { + ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo( + RatisReplicationConfig.getInstance(THREE), 1, CLOSED); + + Set containerReplicas = ReplicationTestUtil + .createReplicasWithOriginAndOpState(containerInfo.containerID(), State.QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE)); + ContainerCheckRequest request = new ContainerCheckRequest.Builder() + .setPendingOps(Collections.emptyList()) + .setReport(new ReplicationManagerReport()) + .setContainerInfo(containerInfo) + .setContainerReplicas(containerReplicas) + .setReplicationQueue(queue) + .build(); + + assertFalse(handler.handle(request)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.OVER_REPLICATED)); + assertEquals(0, queue.underReplicatedQueueSize()); + assertEquals(0, queue.overReplicatedQueueSize()); + } + + @Test + public void testQuasiClosedNotStuckReturnsFalse() { + ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo( + RatisReplicationConfig.getInstance(THREE), 1, QUASI_CLOSED); + + Set containerReplicas = ReplicationTestUtil + .createReplicasWithOriginAndOpState(containerInfo.containerID(), State.QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin2, IN_SERVICE), Pair.of(origin3, IN_SERVICE)); + ContainerCheckRequest request = new ContainerCheckRequest.Builder() + .setPendingOps(Collections.emptyList()) + .setReport(report) + .setContainerInfo(containerInfo) + .setContainerReplicas(containerReplicas) + .setReplicationQueue(queue) + .build(); + + assertFalse(handler.handle(request)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.OVER_REPLICATED)); + assertEquals(0, queue.underReplicatedQueueSize()); + assertEquals(0, queue.overReplicatedQueueSize()); + } + + @Test + public void testQuasiClosedStuckWithOpenReturnsFalse() { + ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo( + RatisReplicationConfig.getInstance(THREE), 1, QUASI_CLOSED); + + Set containerReplicas = ReplicationTestUtil + .createReplicasWithOriginAndOpState(containerInfo.containerID(), State.QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + containerReplicas.addAll(ReplicationTestUtil + .createReplicasWithOriginAndOpState(containerInfo.containerID(), State.OPEN, + Pair.of(origin3, IN_SERVICE))); + ContainerCheckRequest request = new ContainerCheckRequest.Builder() + .setPendingOps(Collections.emptyList()) + .setReport(report) + .setContainerInfo(containerInfo) + .setContainerReplicas(containerReplicas) + .setReplicationQueue(queue) + .build(); + + assertFalse(handler.handle(request)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.OVER_REPLICATED)); + assertEquals(0, queue.underReplicatedQueueSize()); + assertEquals(0, queue.overReplicatedQueueSize()); + } + + @Test + public void testCorrectlyReplicated() { + ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo( + RatisReplicationConfig.getInstance(THREE), 1, QUASI_CLOSED); + + Set containerReplicas = ReplicationTestUtil + .createReplicasWithOriginAndOpState(containerInfo.containerID(), State.QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + ContainerCheckRequest request = new ContainerCheckRequest.Builder() + .setPendingOps(Collections.emptyList()) + .setReport(report) + .setContainerInfo(containerInfo) + .setContainerReplicas(containerReplicas) + .setReplicationQueue(queue) + .build(); + + assertFalse(handler.handle(request)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.OVER_REPLICATED)); + assertEquals(0, queue.underReplicatedQueueSize()); + assertEquals(0, queue.overReplicatedQueueSize()); + } + + @Test + public void testNoReplicasReturnsTrue() { + ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo( + RatisReplicationConfig.getInstance(THREE), 1, QUASI_CLOSED); + + Set containerReplicas = new HashSet<>(); + ContainerCheckRequest request = new ContainerCheckRequest.Builder() + .setPendingOps(Collections.emptyList()) + .setReport(report) + .setContainerInfo(containerInfo) + .setContainerReplicas(containerReplicas) + .setReplicationQueue(queue) + .build(); + + assertTrue(handler.handle(request)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.OVER_REPLICATED)); + assertEquals(1, report.getStat(ReplicationManagerReport.HealthState.MISSING)); + assertEquals(0, queue.underReplicatedQueueSize()); + assertEquals(0, queue.overReplicatedQueueSize()); + } + + @Test + public void testUnderReplicatedOneOriginNotHandled() { + ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo( + RatisReplicationConfig.getInstance(THREE), 1, QUASI_CLOSED); + + Set containerReplicas = ReplicationTestUtil + .createReplicasWithOriginAndOpState(containerInfo.containerID(), State.QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE)); + + ContainerCheckRequest request = new ContainerCheckRequest.Builder() + .setPendingOps(Collections.emptyList()) + .setReport(report) + .setContainerInfo(containerInfo) + .setContainerReplicas(containerReplicas) + .setReplicationQueue(queue) + .build(); + + assertFalse(handler.handle(request)); + } + + @Test + public void testUnderReplicatedWithPendingAddIsNotQueued() { + ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo( + RatisReplicationConfig.getInstance(THREE), 1, QUASI_CLOSED); + + Set containerReplicas = ReplicationTestUtil + .createReplicasWithOriginAndOpState(containerInfo.containerID(), State.QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + + List pendingOps = new ArrayList<>(); + pendingOps.add(new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, MockDatanodeDetails.randomDatanodeDetails(), 0, null, Long.MAX_VALUE)); + + ContainerCheckRequest request = new ContainerCheckRequest.Builder() + .setPendingOps(Collections.emptyList()) + .setReport(report) + .setContainerInfo(containerInfo) + .setContainerReplicas(containerReplicas) + .setReplicationQueue(queue) + .setPendingOps(pendingOps) + .build(); + + assertTrue(handler.handle(request)); + assertEquals(1, report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.OVER_REPLICATED)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.MISSING)); + assertEquals(0, queue.underReplicatedQueueSize()); + assertEquals(0, queue.overReplicatedQueueSize()); + } + + @Test + public void testOverReplicatedIsQueued() { + ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo( + RatisReplicationConfig.getInstance(THREE), 1, QUASI_CLOSED); + + Set containerReplicas = ReplicationTestUtil + .createReplicasWithOriginAndOpState(containerInfo.containerID(), State.QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + + ContainerCheckRequest request = new ContainerCheckRequest.Builder() + .setPendingOps(Collections.emptyList()) + .setReport(report) + .setContainerInfo(containerInfo) + .setContainerReplicas(containerReplicas) + .setReplicationQueue(queue) + .build(); + + assertTrue(handler.handle(request)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + assertEquals(1, report.getStat(ReplicationManagerReport.HealthState.OVER_REPLICATED)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.MISSING)); + assertEquals(0, queue.underReplicatedQueueSize()); + assertEquals(1, queue.overReplicatedQueueSize()); + } + + @Test + public void testOverReplicatedWithPendingDeleteIsNotQueued() { + ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo( + RatisReplicationConfig.getInstance(THREE), 1, QUASI_CLOSED); + + Set containerReplicas = ReplicationTestUtil + .createReplicasWithOriginAndOpState(containerInfo.containerID(), State.QUASI_CLOSED, + Pair.of(origin1, IN_SERVICE), Pair.of(origin1, IN_SERVICE), + Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE), Pair.of(origin2, IN_SERVICE)); + + List pendingOps = new ArrayList<>(); + pendingOps.add(new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.DELETE, MockDatanodeDetails.randomDatanodeDetails(), 0, null, Long.MAX_VALUE)); + + ContainerCheckRequest request = new ContainerCheckRequest.Builder() + .setPendingOps(Collections.emptyList()) + .setReport(report) + .setContainerInfo(containerInfo) + .setContainerReplicas(containerReplicas) + .setReplicationQueue(queue) + .setPendingOps(pendingOps) + .build(); + + assertTrue(handler.handle(request)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED)); + assertEquals(1, report.getStat(ReplicationManagerReport.HealthState.OVER_REPLICATED)); + assertEquals(0, report.getStat(ReplicationManagerReport.HealthState.MISSING)); + assertEquals(0, queue.underReplicatedQueueSize()); + assertEquals(0, queue.overReplicatedQueueSize()); + } + +} diff --git a/hadoop-hdds/server-scm/src/test/resources/replicationManagerTests/quasi_closed.json b/hadoop-hdds/server-scm/src/test/resources/replicationManagerTests/quasi_closed.json index c9e54ded4497..8b841635b730 100644 --- a/hadoop-hdds/server-scm/src/test/resources/replicationManagerTests/quasi_closed.json +++ b/hadoop-hdds/server-scm/src/test/resources/replicationManagerTests/quasi_closed.json @@ -19,7 +19,8 @@ "expectation": { "underReplicated": 1, "underReplicatedQueue": 1, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1}, "checkCommands": [], "commands": [ - { "type": "replicateContainerCommand" } + { "type": "replicateContainerCommand", "datanode": "d1" }, + { "type": "replicateContainerCommand", "datanode": "d2" } ] }, { "description": "Quasi-closed with 3 replicas 2 origins", "containerState": "QUASI_CLOSED", "replicationConfig": "RATIS:THREE", "sequenceId": 12, @@ -28,9 +29,11 @@ { "state": "QUASI_CLOSED", "index": 0, "datanode": "d2", "sequenceId": 12, "isEmpty": false, "origin": "o2"}, { "state": "QUASI_CLOSED", "index": 0, "datanode": "d3", "sequenceId": 12, "isEmpty": false, "origin": "o2"} ], - "expectation": { "underReplicated": 0, "underReplicatedQueue": 0, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1}, + "expectation": { "underReplicated": 1, "underReplicatedQueue": 1, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1}, "checkCommands": [], - "commands": [] + "commands": [ + { "type": "replicateContainerCommand", "datanode": "d1" } + ] }, { "description": "Quasi-closed with 3 replicas 3 origins", "containerState": "QUASI_CLOSED", "replicationConfig": "RATIS:THREE", "sequenceId": 12, "replicas": [ @@ -98,9 +101,45 @@ "expectation": { "underReplicated": 1, "underReplicatedQueue": 1, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1, "unhealthy": 0 }, "checkCommands": [], "commands": [ - { "type": "replicateContainerCommand" } + { "type": "replicateContainerCommand", "datanode": "d1" }, + { "type": "replicateContainerCommand", "datanode": "d2" }, + { "type": "replicateContainerCommand", "datanode": "d3" } ] }, + { "description": "Quasi-Closed one Origin Correctly Replicated", "containerState": "QUASI_CLOSED", "replicationConfig": "RATIS:THREE", "sequenceId": 11, + "replicas": [ + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d1", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d2", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d3", "sequenceId": 10, "isEmpty": false, "origin": "o1"} + ], + "expectation": { "underReplicated": 0, "underReplicatedQueue": 0, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1, "unhealthy": 0 }, + "checkCommands": [], + "commands": [] + }, + { "description": "Quasi-Closed two Origins Correctly Replicated", "containerState": "QUASI_CLOSED", "replicationConfig": "RATIS:THREE", "sequenceId": 11, + "replicas": [ + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d1", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d2", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d3", "sequenceId": 10, "isEmpty": false, "origin": "o2"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d4", "sequenceId": 10, "isEmpty": false, "origin": "o2"} + ], + "expectation": { "underReplicated": 0, "underReplicatedQueue": 0, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1, "unhealthy": 0 }, + "checkCommands": [], + "commands": [] + }, + { "description": "Quasi-Closed three Origins Correctly Replicated", "containerState": "QUASI_CLOSED", "replicationConfig": "RATIS:THREE", "sequenceId": 11, + "replicas": [ + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d1", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d2", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d3", "sequenceId": 10, "isEmpty": false, "origin": "o2"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d4", "sequenceId": 10, "isEmpty": false, "origin": "o2"}, + { "state": "UNHEALTHY", "index": 0, "datanode": "d5", "sequenceId": 11, "isEmpty": false, "origin": "o3"}, + { "state": "UNHEALTHY", "index": 0, "datanode": "d6", "sequenceId": 11, "isEmpty": false, "origin": "o3"} + ], + "expectation": { "underReplicated": 0, "underReplicatedQueue": 0, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1, "unhealthy": 0 }, + "checkCommands": [], + "commands": [] + }, { "description": "Quasi-Closed with 3 QC and one unhealthy", "containerState": "QUASI_CLOSED", "replicationConfig": "RATIS:THREE", "sequenceId": 11, "replicas": [ { "state": "QUASI_CLOSED", "index": 0, "datanode": "d1", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, @@ -108,8 +147,39 @@ { "state": "QUASI_CLOSED", "index": 0, "datanode": "d3", "sequenceId": 10, "isEmpty": false, "origin": "o2"}, { "state": "UNHEALTHY", "index": 0, "datanode": "d4", "sequenceId": 11, "isEmpty": false, "origin": "o3"} ], - "expectation": { "underReplicated": 0, "underReplicatedQueue": 0, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1, "unhealthy": 0 }, + "expectation": { "underReplicated": 1, "underReplicatedQueue": 1, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1, "unhealthy": 0 }, "checkCommands": [], - "commands": [] + "commands": [ + { "type": "replicateContainerCommand", "datanode": "d1" }, + { "type": "replicateContainerCommand", "datanode": "d4" } + ] + }, + { "description": "Quasi-Closed 3 on one origin 1 unhealthy decommissioning", "containerState": "QUASI_CLOSED", "replicationConfig": "RATIS:THREE", "sequenceId": 11, + "replicas": [ + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d1", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d2", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d3", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "UNHEALTHY", "index": 0, "datanode": "d4", "sequenceId": 11, "isEmpty": false, "origin": "o3", "operationalState": "DECOMMISSIONING"} + ], + "expectation": { "underReplicated": 1, "underReplicatedQueue": 1, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1, "unhealthy": 0 }, + "checkCommands": [], + "commands": [ + { "type": "replicateContainerCommand", "datanode": "d4" }, + { "type": "replicateContainerCommand", "datanode": "d4" } + ] + }, + { "description": "Quasi-Closed 3 on one origin 2 unhealthy with 1 decommissioning", "containerState": "QUASI_CLOSED", "replicationConfig": "RATIS:THREE", "sequenceId": 11, + "replicas": [ + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d1", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d2", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "QUASI_CLOSED", "index": 0, "datanode": "d3", "sequenceId": 10, "isEmpty": false, "origin": "o1"}, + { "state": "UNHEALTHY", "index": 0, "datanode": "d4", "sequenceId": 11, "isEmpty": false, "origin": "o3", "operationalState": "DECOMMISSIONING"}, + { "state": "UNHEALTHY", "index": 0, "datanode": "d5", "sequenceId": 11, "isEmpty": false, "origin": "o3"} + ], + "expectation": { "underReplicated": 1, "underReplicatedQueue": 1, "overReplicated": 0, "overReplicatedQueue": 0, "quasiClosedStuck": 1, "unhealthy": 0 }, + "checkCommands": [], + "commands": [ + { "type": "replicateContainerCommand"} + ] } -] \ No newline at end of file +]