diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java index 9d78063a4dcd..0f30449c9750 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java @@ -208,6 +208,29 @@ public List chooseDatanodes( } } + // Fall back logic for node pick up. + DatanodeDetails fallBackPickNodes( + List nodeSet, List excludedNodes) + throws SCMException{ + DatanodeDetails node; + if (excludedNodes == null || excludedNodes.isEmpty()) { + node = chooseNode(nodeSet); + } else { + List inputNodes = nodeSet.stream() + .filter(p -> !excludedNodes.contains(p)).collect(Collectors.toList()); + node = chooseNode(inputNodes); + } + + if (node == null) { + String msg = String.format("Unable to find fall back node in" + + " pipeline allocation. nodeSet size: {}", nodeSet.size()); + LOG.warn(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + return node; + } + /** * Get result set based on the pipeline placement algorithm which considers * network topology and rack awareness. @@ -220,50 +243,59 @@ public List chooseDatanodes( public List getResultSet( int nodesRequired, List healthyNodes) throws SCMException { + if (nodesRequired != HddsProtos.ReplicationFactor.THREE.getNumber()) { + throw new SCMException("Nodes required number is not supported: " + + nodesRequired, SCMException.ResultCodes.INVALID_CAPACITY); + } + + // Assume rack awareness is not enabled. + boolean rackAwareness = false; List results = new ArrayList<>(nodesRequired); // Since nodes are widely distributed, the results should be selected // base on distance in topology, rack awareness and load balancing. List exclude = new ArrayList<>(); // First choose an anchor nodes randomly DatanodeDetails anchor = chooseNode(healthyNodes); - if (anchor == null) { - LOG.warn("Unable to find healthy node for anchor(first) node." + - " Required nodes: {}, Found nodes: {}", - nodesRequired, results.size()); - throw new SCMException("Unable to find required number of nodes.", + if (anchor != null) { + results.add(anchor); + exclude.add(anchor); + } else { + LOG.warn("Unable to find healthy node for anchor(first) node."); + throw new SCMException("Unable to find anchor node.", SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); } if (LOG.isDebugEnabled()) { LOG.debug("First node chosen: {}", anchor); } - results.add(anchor); - exclude.add(anchor); // Choose the second node on different racks from anchor. - DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness( + DatanodeDetails nextNode = chooseNodeBasedOnRackAwareness( healthyNodes, exclude, nodeManager.getClusterNetworkTopologyMap(), anchor); - if (nodeOnDifferentRack == null) { - LOG.warn("Pipeline Placement: Unable to find 2nd node on different " + - "racks that meets the criteria. Required nodes: {}, Found nodes:" + - " {}", nodesRequired, results.size()); - throw new SCMException("Unable to find required number of nodes.", - SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Second node chosen: {}", nodeOnDifferentRack); + if (nextNode != null) { + // Rack awareness is detected. + rackAwareness = true; + results.add(nextNode); + exclude.add(nextNode); + if (LOG.isDebugEnabled()) { + LOG.debug("Second node chosen: {}", nextNode); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Pipeline Placement: Unable to find 2nd node on different " + + "rack based on rack awareness."); + } } - results.add(nodeOnDifferentRack); - exclude.add(nodeOnDifferentRack); - // Then choose nodes close to anchor based on network topology int nodesToFind = nodesRequired - results.size(); for (int x = 0; x < nodesToFind; x++) { - // invoke the choose function defined in the derived classes. - DatanodeDetails pick = chooseNodeFromNetworkTopology( - nodeManager.getClusterNetworkTopologyMap(), anchor, exclude); + // Pick remaining nodes based on the existence of rack awareness. + DatanodeDetails pick = rackAwareness + ? chooseNodeFromNetworkTopology( + nodeManager.getClusterNetworkTopologyMap(), anchor, exclude) + : fallBackPickNodes(healthyNodes, exclude); if (pick != null) { results.add(pick); exclude.add(pick); @@ -293,6 +325,9 @@ public List getResultSet( @Override public DatanodeDetails chooseNode( List healthyNodes) { + if (healthyNodes == null || healthyNodes.isEmpty()) { + return null; + } int firstNodeNdx = getRand().nextInt(healthyNodes.size()); int secondNodeNdx = getRand().nextInt(healthyNodes.size()); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java index b9aa9afb0518..daad80834c5a 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java @@ -83,10 +83,66 @@ public void testChooseNodeBasedOnRackAwareness() { DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness( healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT), topologyWithDifRacks, anchor); + Assert.assertNotNull(nextNode); Assert.assertFalse(anchor.getNetworkLocation().equals( nextNode.getNetworkLocation())); } + @Test + public void testFallBackPickNodes() { + List healthyNodes = overWriteLocationInNodes( + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY)); + DatanodeDetails node; + try { + node = placementPolicy.fallBackPickNodes(healthyNodes, null); + Assert.assertNotNull(node); + } catch (SCMException e) { + Assert.fail("Should not reach here."); + } + + // when input nodeSet are all excluded. + List exclude = healthyNodes; + try { + node = placementPolicy.fallBackPickNodes(healthyNodes, exclude); + Assert.assertNull(node); + } catch (SCMException e) { + Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE, + e.getResult()); + } catch (Exception ex) { + Assert.fail("Should not reach here."); + } + } + + @Test + public void testRackAwarenessNotEnabledWithFallBack() throws SCMException{ + List healthyNodes = + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes); + DatanodeDetails randomNode = placementPolicy.chooseNode(healthyNodes); + // rack awareness is not enabled. + Assert.assertTrue(anchor.getNetworkLocation().equals( + randomNode.getNetworkLocation())); + + NetworkTopology topology = new NetworkTopologyImpl(new Configuration()); + DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness( + healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT), + topology, anchor); + // RackAwareness should not be able to choose any node. + Assert.assertNull(nextNode); + + // PlacementPolicy should still be able to pick a set of 3 nodes. + int numOfNodes = HddsProtos.ReplicationFactor.THREE.getNumber(); + List results = placementPolicy + .getResultSet(numOfNodes, healthyNodes); + + Assert.assertEquals(numOfNodes, results.size()); + // All nodes are on same rack. + Assert.assertEquals(results.get(0).getNetworkLocation(), + results.get(1).getNetworkLocation()); + Assert.assertEquals(results.get(0).getNetworkLocation(), + results.get(2).getNetworkLocation()); + } + private final static Node[] NODES = new NodeImpl[] { new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT), new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),