diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java index af57775fd642..f07b1c5a6f31 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException; import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.ratis.protocol.exceptions.NotLeaderException; @@ -62,12 +63,6 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler { private final long currentContainerSize; private final ReplicationManager replicationManager; - private static class CannotFindTargetsException extends IOException { - CannotFindTargetsException(Throwable cause) { - super(cause); - } - } - public ECUnderReplicationHandler(final PlacementPolicy containerPlacement, final ConfigurationSource conf, ReplicationManager replicationManager) { this.containerPlacement = containerPlacement; @@ -164,49 +159,60 @@ public int processAndSendCommands( .collect(Collectors.toList()); try { - commandsSent += processMissingIndexes(replicaCount, sources, - availableSourceNodes, excludedNodes); - commandsSent += processDecommissioningIndexes(replicaCount, sources, - availableSourceNodes, excludedNodes); - commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources, - excludedNodes); - // TODO - we should be able to catch SCMException here and check the - // result code but the RackAware topology never sets the code. - } catch (CannotFindTargetsException e) { - // If we get here, we tried to find nodes to fix the under replication - // issues, but were not able to find any at some stage, and the - // placement policy threw an exception. - // At this stage. If the cluster is small and there are some - // over replicated indexes, it could stop us finding a new node as there - // are no more nodes left to try. - // If the container is also over replicated, then hand it off to the - // over-rep handler, and after those over-rep indexes are cleared the - // under replication can be re-tried in the next iteration of RM. - // However, we should only hand off to the over rep handler if there are - // no commands already created. If we have some commands, they may - // attempt to use sources the over-rep handler would remove. So we - // should let the commands we have created be processed, and then the - // container will be re-processed in a further RM pass. - LOG.debug("Unable to located new target nodes for container {}", - container, e); - if (commandsSent > 0) { - LOG.debug("Some commands have already been created, so returning " + - "with them only"); - return commandsSent; + InsufficientDatanodesException firstException = null; + try { + commandsSent += processMissingIndexes(replicaCount, sources, + availableSourceNodes, excludedNodes); + } catch (InsufficientDatanodesException e) { + firstException = e; + } + try { + commandsSent += processDecommissioningIndexes(replicaCount, sources, + availableSourceNodes, excludedNodes); + } catch (InsufficientDatanodesException e) { + if (firstException == null) { + firstException = e; + } } + try { + commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources, + excludedNodes); + } catch (InsufficientDatanodesException e) { + if (firstException == null) { + firstException = e; + } + } + if (firstException != null) { + // We had partial success through some of the steps, so just throw the + // first exception we got. This will cause the container to be + // re-queued and try again later. + throw firstException; + } + } catch (SCMException e) { + SCMException.ResultCodes code = e.getResult(); + if (code != SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE) { + throw e; + } + // If we get here, we got an exception indicating the placement policy + // was not able to find ANY nodes to satisfy the replication at one of + // the processing stages (missing index, decom or maint). It is + // possible that some commands were sent to partially fix the + // replication, but a further run will be needed to fix the rest. + // On a small cluster, it is possible that over replication could stop + // nodes getting selected, so to check if that is the case, we run + // the over rep handler, which may free some nodes for the next run. if (replicaCount.isOverReplicated()) { LOG.debug("Container {} is both under and over replicated. Cannot " + "find enough target nodes, so handing off to the " + "OverReplication handler", container); - return replicationManager.processOverReplicatedContainer(result); - } else { - throw (SCMException)e.getCause(); + replicationManager.processOverReplicatedContainer(result); } + // As we want to re-queue and try again later, we just re-throw + throw e; } } catch (IOException | IllegalStateException ex) { - LOG.warn("Exception while processing for creating the EC reconstruction" + - " container commands for container {}.", - id, ex); + LOG.warn("Exception while creating the replication or" + + " reconstruction commands for container {}.", id, ex); throw ex; } if (commandsSent == 0) { @@ -253,25 +259,6 @@ private Map> filterSources( })); } - private List getTargetDatanodes( - List excludedNodes, ContainerInfo container, - int requiredNodes) throws IOException { - // We should ensure that the target datanode has enough space - // for a complete container to be created, but since the container - // size may be changed smaller than origin, we should be defensive. - final long dataSizeRequired = - Math.max(container.getUsedBytes(), currentContainerSize); - try { - return containerPlacement - .chooseDatanodes(excludedNodes, null, requiredNodes, 0, - dataSizeRequired); - } catch (SCMException e) { - // SCMException can come from many places in SCM, so catch it here and - // throw a more specific exception instead. - throw new CannotFindTargetsException(e); - } - } - /** * Processes replicas that are in maintenance nodes and should need * additional copies. @@ -293,9 +280,19 @@ private int processMissingIndexes( int commandsSent = 0; if (sources.size() >= repConfig.getData()) { - final List selectedDatanodes = getTargetDatanodes( - excludedNodes, container, missingIndexes.size()); + int expectedTargets = missingIndexes.size(); + final List selectedDatanodes = + ReplicationManagerUtil.getTargetDatanodes(containerPlacement, + expectedTargets, null, excludedNodes, currentContainerSize, + container); + // If we got less targets than missing indexes, we need to prune the + // missing index list so it only tries to recover the nummber of indexes + // we have targets for. + if (selectedDatanodes.size() < expectedTargets) { + missingIndexes.subList(selectedDatanodes.size(), + missingIndexes.size()).clear(); + } if (validatePlacement(availableSourceNodes, selectedDatanodes)) { excludedNodes.addAll(selectedDatanodes); // TODO - what are we adding all the selected nodes to available @@ -324,6 +321,14 @@ private int processMissingIndexes( } commandsSent++; } + if (selectedDatanodes.size() != expectedTargets) { + LOG.debug("Insufficient nodes were returned from the placement policy" + + " to fully reconstruct container {}. Requested {} received {}", + container.getContainerID(), expectedTargets, + selectedDatanodes.size()); + throw new InsufficientDatanodesException(missingIndexes.size(), + selectedDatanodes.size()); + } } else { LOG.warn("Cannot proceed for EC container reconstruction for {}, due" + " to insufficient source replicas found. Number of source " @@ -350,7 +355,10 @@ private int processDecommissioningIndexes( int commandsSent = 0; if (decomIndexes.size() > 0) { final List selectedDatanodes = - getTargetDatanodes(excludedNodes, container, decomIndexes.size()); + ReplicationManagerUtil.getTargetDatanodes(containerPlacement, + decomIndexes.size(), null, excludedNodes, currentContainerSize, + container); + if (validatePlacement(availableSourceNodes, selectedDatanodes)) { excludedNodes.addAll(selectedDatanodes); Iterator iterator = selectedDatanodes.iterator(); @@ -377,6 +385,14 @@ private int processDecommissioningIndexes( commandsSent++; } } + if (selectedDatanodes.size() != decomIndexes.size()) { + LOG.debug("Insufficient nodes were returned from the placement policy" + + " to fully replicate the decommission indexes for container {}." + + " Requested {} received {}", container.getContainerID(), + decomIndexes.size(), selectedDatanodes.size()); + throw new InsufficientDatanodesException(decomIndexes.size(), + selectedDatanodes.size()); + } } return commandsSent; } @@ -407,8 +423,9 @@ private int processMaintenanceOnlyIndexes( if (additionalMaintenanceCopiesNeeded == 0) { return 0; } - List targets = getTargetDatanodes(excludedNodes, container, - additionalMaintenanceCopiesNeeded); + List targets = ReplicationManagerUtil.getTargetDatanodes( + containerPlacement, maintIndexes.size(), null, excludedNodes, + currentContainerSize, container); excludedNodes.addAll(targets); Iterator iterator = targets.iterator(); @@ -439,6 +456,14 @@ private int processMaintenanceOnlyIndexes( commandsSent++; additionalMaintenanceCopiesNeeded -= 1; } + if (targets.size() != maintIndexes.size()) { + LOG.debug("Insufficient nodes were returned from the placement policy" + + " to fully replicate the maintenance indexes for container {}." + + " Requested {} received {}", container.getContainerID(), + maintIndexes.size(), targets.size()); + throw new InsufficientDatanodesException(maintIndexes.size(), + targets.size()); + } return commandsSent; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java index abf7afe309b3..4ab454955651 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; @@ -76,25 +75,6 @@ protected abstract ContainerReplicaCount getContainerReplicaCount( List pendingOps, int remainingMaintenanceRedundancy) throws IOException; - private List getTargetDatanodes( - List usedNodes, List excludedNodes, - ContainerInfo container, int requiredNodes) throws IOException { - final long dataSizeRequired = - Math.max(container.getUsedBytes(), currentContainerSize); - while (requiredNodes > 0) { - try { - return containerPlacement.chooseDatanodes(usedNodes, excludedNodes, - null, requiredNodes, 0, dataSizeRequired); - } catch (IOException e) { - requiredNodes -= 1; - } - } - throw new SCMException(String.format("Placement Policy: %s did not return" - + " any nodes. Number of required Nodes %d, Datasize Required: %d", - containerPlacement.getClass(), requiredNodes, dataSizeRequired), - SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); - } - private Set filterSources(Set replicas) { return replicas.stream() .filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos @@ -196,8 +176,10 @@ public int processAndSendCommands( .map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toList()); int requiredNodes = replicasToBeReplicated.size(); - List targetDatanodes = getTargetDatanodes(usedDns, - excludedDns, container, requiredNodes); + + List targetDatanodes = ReplicationManagerUtil + .getTargetDatanodes(containerPlacement, requiredNodes, usedDns, + excludedDns, currentContainerSize, container); int count = sendReplicateCommands(container, replicasToBeReplicated, targetDatanodes); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java index 3f82bbb0eb41..cac0be0ee3c1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerReplica; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; @@ -241,28 +240,9 @@ private List getTargets( .collect(Collectors.toList()); excludeList.addAll(pendingReplication); - /* - Ensure that target datanodes have enough space to hold a complete - container. - */ - final long dataSizeRequired = - Math.max(replicaCount.getContainer().getUsedBytes(), - currentContainerSize); - int requiredNodes = replicaCount.additionalReplicaNeeded(); - while (requiredNodes > 0) { - try { - return placementPolicy.chooseDatanodes(excludeList, null, - requiredNodes, 0, dataSizeRequired); - } catch (IOException e) { - LOG.debug("Placement policy was not able to return {} nodes. ", - requiredNodes, e); - requiredNodes--; - } - } - throw new SCMException(String.format("Placement Policy: %s did not return" - + " any nodes. Number of required Nodes %d, Datasize Required: %d", - placementPolicy.getClass(), requiredNodes, dataSizeRequired), - SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + return ReplicationManagerUtil.getTargetDatanodes(placementPolicy, + replicaCount.additionalReplicaNeeded(), null, excludeList, + currentContainerSize, replicaCount.getContainer()); } private int sendReplicationCommands( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java new file mode 100644 index 000000000000..12b1eecf6128 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * Utility class for ReplicationManager. + */ +public final class ReplicationManagerUtil { + + private ReplicationManagerUtil() { + } + + private static final Logger LOG = LoggerFactory.getLogger( + ReplicationManagerUtil.class); + + /** + * Using the passed placement policy attempt to select a list of datanodes to + * use as new targets. If the placement policy is unable to select enough + * nodes, the number of nodes requested will be reduced by 1 and the placement + * policy will be called again. This will continue until the placement policy + * is able to select enough nodes or the number of nodes requested is reduced + * to zero when an exception will be thrown. + * @param policy The placement policy to use to select nodes. + * @param requiredNodes The number of nodes required + * @param usedNodes Any nodes already used by the container + * @param excludedNodes Any Excluded nodes which cannot be selected + * @param defaultContainerSize The cluster default max container size + * @param container The container to select new replicas for + * @return A list of up to requiredNodes datanodes to use as targets for new + * replicas. Note the number of nodes returned may be less than the + * number of nodes requested if the placement policy is unable to + * return enough nodes. + * @throws SCMException If no nodes can be selected. + */ + public static List getTargetDatanodes(PlacementPolicy policy, + int requiredNodes, List usedNodes, + List excludedNodes, long defaultContainerSize, + ContainerInfo container) throws SCMException { + + // Ensure that target datanodes have enough space to hold a complete + // container. + final long dataSizeRequired = + Math.max(container.getUsedBytes(), defaultContainerSize); + + int mutableRequiredNodes = requiredNodes; + while (mutableRequiredNodes > 0) { + try { + if (usedNodes == null) { + return policy.chooseDatanodes(excludedNodes, null, + mutableRequiredNodes, 0, dataSizeRequired); + } else { + return policy.chooseDatanodes(usedNodes, excludedNodes, null, + mutableRequiredNodes, 0, dataSizeRequired); + } + } catch (IOException e) { + LOG.debug("Placement policy was not able to return {} nodes for " + + "container {}.", + mutableRequiredNodes, container.getContainerID(), e); + mutableRequiredNodes--; + } + } + throw new SCMException(String.format("Placement Policy: %s did not return" + + " any nodes. Number of required Nodes %d, Datasize Required: %d", + policy.getClass(), requiredNodes, dataSizeRequired), + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java index b42add9ecf71..09f10bdf5fe5 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto; import org.apache.hadoop.hdds.scm.ContainerPlacementStatus; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; @@ -461,14 +462,87 @@ public void testUnderRepSentToOverRepHandlerIfNoNewNodes() return expectedDelete.size(); }); commandsSent.clear(); - ecURH.processAndSendCommands(availableReplicas, - Collections.emptyList(), underRep, 2); + assertThrows(SCMException.class, + () -> ecURH.processAndSendCommands(availableReplicas, + Collections.emptyList(), underRep, 2)); Mockito.verify(replicationManager, times(1)) .processOverReplicatedContainer(underRep); Assertions.assertEquals(true, expectedDelete.equals(commandsSent)); } } + @Test + public void testPartialReconstructionIfNotEnoughNodes() { + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(IN_SERVICE, 1), + Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3)); + PlacementPolicy placementPolicy = ReplicationTestUtil + .getInsufficientNodesTestPlacementPolicy(nodeManager, conf, 2); + ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler( + placementPolicy, conf, replicationManager); + + ContainerHealthResult.UnderReplicatedHealthResult underRep = + new ContainerHealthResult.UnderReplicatedHealthResult(container, + 0, false, false, false); + + assertThrows(InsufficientDatanodesException.class, () -> + ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(), + underRep, 1)); + Assertions.assertEquals(1, commandsSent.size()); + ReconstructECContainersCommand cmd = (ReconstructECContainersCommand) + commandsSent.iterator().next().getValue(); + Assertions.assertEquals(1, cmd.getTargetDatanodes().size()); + } + + @Test + public void testPartialDecommissionIfNotEnoughNodes() { + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(IN_SERVICE, 1), + Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3), + Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5)); + PlacementPolicy placementPolicy = ReplicationTestUtil + .getInsufficientNodesTestPlacementPolicy(nodeManager, conf, 2); + ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler( + placementPolicy, conf, replicationManager); + + ContainerHealthResult.UnderReplicatedHealthResult underRep = + new ContainerHealthResult.UnderReplicatedHealthResult(container, + 0, true, false, false); + + assertThrows(InsufficientDatanodesException.class, () -> + ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(), + underRep, 1)); + Assertions.assertEquals(1, commandsSent.size()); + SCMCommand cmd = commandsSent.iterator().next().getValue(); + Assertions.assertEquals( + SCMCommandProto.Type.replicateContainerCommand, cmd.getType()); + } + + @Test + public void testPartialMaintenanceIfNotEnoughNodes() { + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(IN_SERVICE, 1), + Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3), + Pair.of(ENTERING_MAINTENANCE, 4), + Pair.of(ENTERING_MAINTENANCE, 5)); + PlacementPolicy placementPolicy = ReplicationTestUtil + .getInsufficientNodesTestPlacementPolicy(nodeManager, conf, 2); + ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler( + placementPolicy, conf, replicationManager); + + ContainerHealthResult.UnderReplicatedHealthResult underRep = + new ContainerHealthResult.UnderReplicatedHealthResult(container, + 0, false, false, false); + + assertThrows(InsufficientDatanodesException.class, () -> + ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(), + underRep, 2)); + Assertions.assertEquals(1, commandsSent.size()); + SCMCommand cmd = commandsSent.iterator().next().getValue(); + Assertions.assertEquals( + SCMCommandProto.Type.replicateContainerCommand, cmd.getType()); + } + @Test public void testUnderRepWithDecommissionAndNotEnoughNodes() throws IOException { @@ -511,18 +585,20 @@ public void testUnderRepWithDecommissionAndNotEnoughNodes() availableReplicas.add(toAdd); } - ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(), - underRep, 2); + assertThrows(SCMException.class, + () -> ecURH.processAndSendCommands(availableReplicas, + Collections.emptyList(), underRep, 2)); - Mockito.verify(replicationManager, times(0)) + Mockito.verify(replicationManager, times(1)) .processOverReplicatedContainer(underRep); Assertions.assertEquals(1, commandsSent.size()); Pair> pair = commandsSent.iterator().next(); Assertions.assertEquals(newNode, pair.getKey()); - Assertions.assertEquals(StorageContainerDatanodeProtocolProtos - .SCMCommandProto.Type.reconstructECContainersCommand, + Assertions.assertEquals( + SCMCommandProto.Type.reconstructECContainersCommand, pair.getValue().getType()); + Mockito.clearInvocations(replicationManager); commandsSent.clear(); } } @@ -552,9 +628,8 @@ public void testUnderRepDueToDecomAndOverRep() () -> ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(), underRep, 1)); - // Now adjust replicas so it is also over replicated. This time rather than - // throwing it should call the OverRepHandler and return whatever it - // returns, which in this case is a delete command for replica index 4. + // Now adjust replicas so it is also over replicated. This time it should + // call the OverRepHandler and then throw ContainerReplica overRepReplica = ReplicationTestUtil.createContainerReplica(container.containerID(), 4, IN_SERVICE, CLOSED); @@ -569,8 +644,8 @@ public void testUnderRepDueToDecomAndOverRep() commandsSent.addAll(expectedDelete); return expectedDelete.size(); }); - ecURH.processAndSendCommands(availableReplicas, - Collections.emptyList(), underRep, 1); + assertThrows(SCMException.class, () -> ecURH.processAndSendCommands( + availableReplicas, Collections.emptyList(), underRep, 1)); Mockito.verify(replicationManager, times(1)) .processOverReplicatedContainer(underRep); Assertions.assertEquals(true, expectedDelete.equals(commandsSent)); @@ -594,16 +669,17 @@ public void testMissingAndDecomIndexWithOnlyOneNewNodeAvailable() commandsSent.clear(); // Now add a decommissioning index - we will not get a replicate command - // for it, as the placement policy will throw an exception as we catch - // and just return the first reconstruction command. This will not goto - // the over-rep handler as we have a command already created for the under - // replication, even if the container is over replicated too. + // for it, as the placement policy will throw an exception which will + // come up the stack and be thrown out to indicate this container must be + // retried. Set replicas = ReplicationTestUtil .createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 4)); - testUnderReplicationWithMissingIndexes(ImmutableList.of(5), - replicas, 0, 0, sameNodePolicy); + + assertThrows(SCMException.class, () -> + testUnderReplicationWithMissingIndexes(ImmutableList.of(5), replicas, + 0, 0, sameNodePolicy)); } @Test