diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 524b5ec8b216..84efdc2e1403 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -56,6 +56,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy { private final int heavyNodeCriteria; private static final int REQUIRED_RACKS = 2; + public static final String MULTIPLE_RACK_PIPELINE_MSG = + "The cluster has multiple racks, but all nodes with available " + + "pipeline capacity are on a single rack. There are insufficient " + + "cross rack nodes available to create a pipeline"; + /** * Constructs a pipeline placement with considering network topology, * load balancing and rack awareness. @@ -120,6 +125,7 @@ List filterViableNodes( // get nodes in HEALTHY state List healthyNodes = nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + boolean multipleRacks = multipleRacksAvailable(healthyNodes); if (excludedNodes != null) { healthyNodes.removeAll(excludedNodes); } @@ -163,9 +169,38 @@ List filterViableNodes( throw new SCMException(msg, SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); } + + if (!checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) { + boolean multipleRacksAfterFilter = multipleRacksAvailable(healthyList); + if (multipleRacks && !multipleRacksAfterFilter) { + LOG.debug(MULTIPLE_RACK_PIPELINE_MSG); + throw new SCMException(MULTIPLE_RACK_PIPELINE_MSG, + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + } return healthyList; } + /** + * Given a list of Datanodes, return false if the entire list is only on a + * single rack, or the list is empty. If there is more than 1 rack, return + * true. + * @param dns List of datanodes to check + * @return True if there are multiple racks, false otherwise + */ + private boolean multipleRacksAvailable(List dns) { + if (dns.size() <= 1) { + return false; + } + String initialRack = dns.get(0).getNetworkLocation(); + for (DatanodeDetails dn : dns) { + if (!dn.getNetworkLocation().equals(initialRack)) { + return true; + } + } + return false; + } + /** * Pipeline placement choose datanodes to join the pipeline. * diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 5b635a7bee94..4b8b37dee273 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -390,6 +390,19 @@ public void clearCommandQueue(UUID dnId) { } } + public void setNodeState(DatanodeDetails dn, HddsProtos.NodeState state) { + healthyNodes.remove(dn); + staleNodes.remove(dn); + deadNodes.remove(dn); + if (state == HEALTHY) { + healthyNodes.add(dn); + } else if (state == STALE) { + staleNodes.add(dn); + } else { + deadNodes.add(dn); + } + } + /** * Closes this stream and releases any system resources associated with it. If * the stream is already closed then invoking this method has no effect. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index 8d6a28cc2a0d..1274608c39c2 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -44,7 +44,9 @@ import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -439,6 +441,85 @@ public void testValidatePlacementPolicySingleRackInCluster() { assertEquals(0, status.misReplicationCount()); } + @Test + public void test3NodesInSameRackReturnedWhenOnlyOneHealthyRackIsPresent() + throws Exception { + List dns = setupSkewedRacks(); + + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); + // Set the only node on rack1 stale. This makes the cluster effectively a + // single rack. + nodeManager.setNodeState(dns.get(0), HddsProtos.NodeState.STALE); + + // As there is only 1 rack alive, the 3 DNs on /rack2 should be returned + List pickedDns = placementPolicy.chooseDatanodes( + new ArrayList<>(), new ArrayList<>(), nodesRequired, 0); + + assertEquals(3, pickedDns.size()); + assertTrue(pickedDns.contains(dns.get(1))); + assertTrue(pickedDns.contains(dns.get(2))); + assertTrue(pickedDns.contains(dns.get(3))); + } + + @Rule + public ExpectedException thrownExp = ExpectedException.none(); + + @Test + public void testExceptionIsThrownWhenRackAwarePipelineCanNotBeCreated() + throws Exception { + thrownExp.expect(SCMException.class); + thrownExp.expectMessage(PipelinePlacementPolicy.MULTIPLE_RACK_PIPELINE_MSG); + + List dns = setupSkewedRacks(); + + // Set the first node to its pipeline limit. This means there are only + // 3 hosts on a single rack available for new pipelines + insertHeavyNodesIntoNodeManager(dns, 1); + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); + + placementPolicy.chooseDatanodes( + new ArrayList<>(), new ArrayList<>(), nodesRequired, 0); + } + + @Test + public void testExceptionThrownRackAwarePipelineCanNotBeCreatedExcludedNode() + throws Exception { + thrownExp.expect(SCMException.class); + thrownExp.expectMessage(PipelinePlacementPolicy.MULTIPLE_RACK_PIPELINE_MSG); + + List dns = setupSkewedRacks(); + + // Set the first node to its pipeline limit. This means there are only + // 3 hosts on a single rack available for new pipelines + insertHeavyNodesIntoNodeManager(dns, 1); + int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber(); + + List excluded = new ArrayList<>(); + excluded.add(dns.get(0)); + placementPolicy.chooseDatanodes( + excluded, new ArrayList<>(), nodesRequired, 0); + } + + private List setupSkewedRacks() { + cluster = initTopology(); + + List dns = new ArrayList<>(); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host1", "/rack1")); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host2", "/rack2")); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host3", "/rack2")); + dns.add(MockDatanodeDetails + .createDatanodeDetails("host4", "/rack2")); + + nodeManager = new MockNodeManager(cluster, dns, + false, PIPELINE_PLACEMENT_MAX_NODES_COUNT); + placementPolicy = new PipelinePlacementPolicy( + nodeManager, stateManager, conf); + return dns; + } + private boolean checkDuplicateNodesUUID(List nodes) { HashSet uuids = nodes.stream(). map(DatanodeDetails::getUuid).