Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,42 @@ List<DatanodeDetails> filterViableNodes(
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}

if (!checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
boolean multipleRacks = multipleRacksAvailable(healthyNodes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also consider excludedNodes here?

Copy link
Contributor

@fapifta fapifta Aug 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 124 if there are exclusions, then they are removed from the healthyNodes list, then based on the remaining elements healthyList is created by filtering the nodes that can not accept more pipeline.
At this point with the healthyNodes list we are checking whether there were multiple racks configured for the healthy nodes, as we need to know that we are in a multi rack environment. (If the whole cluster has just one rack, then even though we have rack awareness logic, we can not allocate pipelines in two racks, hence this check is needed.)
The second check is after filtering nodes that can not accept more pipeline, and the intention is to throw an exception if the remaining nodes are in one single rack, as in this case we have more racks with healthy nodes, still we can't allocate a rack aware pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the first call to multipleRacksAvailable(...) we just use all healthy nodes and don't worry about excluded nodes. As Pifta said, excluded nodes are handled later and this first call is just to check if the cluster has multiple alive racks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take the same example given in the description,
Rack 1 = 10 nodes; Rack 2 = 1 node

For some reason, the one node which is in Rack 2 is added to the exclude list. According to SCM the node in Rack 2 is still healthy.

We will end up creating a pipeline in the same rack (Rack 1), even though we have two racks. We will run into same scenario which this PR is trying to address.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct. Checking this again, I see at line 121/126 the excluded nodes are removed from the healthyList:

    List<DatanodeDetails> healthyNodes =
        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
    if (excludedNodes != null) {
      healthyNodes.removeAll(excludedNodes);
    }

So I need to move the line:

boolean multipleRacks = multipleRacksAvailable(healthyNodes);

To before removing the excluded nodes.

boolean multipleRacksAfterFilter = multipleRacksAvailable(healthyList);
if (multipleRacks && !multipleRacksAfterFilter) {
msg = "The cluster has multiple racks, but all nodes with available " +
"pipeline capacity are on a single rack. There are insufficient " +
"cross rack nodes available to create a pipeline";
LOG.debug(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
}
return healthyList;
}

/**
* Given a list of Datanodes, return false if the entire list is only on a
* single rack, or the list is empty. If there is more than 1 rack, return
* true.
* @param dns List of datanodes to check
* @return True if there are multiple racks, false otherwise
*/
private boolean multipleRacksAvailable(List<DatanodeDetails> dns) {
if (dns.size() <= 1) {
return false;
}
String initialRack = dns.get(0).getNetworkLocation();
for (DatanodeDetails dn : dns) {
if (!dn.getNetworkLocation().equals(initialRack)) {
return true;
}
}
return false;
}

/**
* Pipeline placement choose datanodes to join the pipeline.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,19 @@ public void clearCommandQueue(UUID dnId) {
}
}

public void setNodeState(DatanodeDetails dn, HddsProtos.NodeState state) {
healthyNodes.remove(dn);
staleNodes.remove(dn);
deadNodes.remove(dn);
if (state == HEALTHY) {
healthyNodes.add(dn);
} else if (state == STALE) {
staleNodes.add(dn);
} else {
deadNodes.add(dn);
}
}

/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;

import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -439,6 +440,50 @@ public void testValidatePlacementPolicySingleRackInCluster() {
assertEquals(0, status.misReplicationCount());
}

@Test
public void testPreventNonRackAwarePipelinesWithSkewedRacks()
throws Exception {
cluster = initTopology();

List<DatanodeDetails> dns = new ArrayList<>();
dns.add(MockDatanodeDetails
.createDatanodeDetails("host1", "/rack1"));
dns.add(MockDatanodeDetails
.createDatanodeDetails("host2", "/rack2"));
dns.add(MockDatanodeDetails
.createDatanodeDetails("host3", "/rack2"));
dns.add(MockDatanodeDetails
.createDatanodeDetails("host4", "/rack2"));

nodeManager = new MockNodeManager(cluster, dns,
false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
placementPolicy = new PipelinePlacementPolicy(
nodeManager, stateManager, conf);

// Set the first load to its pipeline limit. This means there are only
// 3 hosts on a single rack available for new pipelines
insertHeavyNodesIntoNodeManager(dns, 1);

int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();

LambdaTestUtils.intercept(SCMException.class,
"The cluster has multiple racks, but all nodes with " +
"available pipeline capacity are on a single rack.",
() -> placementPolicy.chooseDatanodes(
new ArrayList<>(), new ArrayList<>(), nodesRequired, 0));
// Set the only node on rack1 stale, meaning we only have 1 rack alive now
nodeManager.setNodeState(dns.get(0), HddsProtos.NodeState.STALE);

// As there is only 1 rack alive, the 3 DNs on /rack2 should be returned
List<DatanodeDetails> pickedDns = placementPolicy.chooseDatanodes(
new ArrayList<>(), new ArrayList<>(), nodesRequired, 0);

assertEquals(3, pickedDns.size());
assertTrue(pickedDns.contains(dns.get(1)));
assertTrue(pickedDns.contains(dns.get(2)));
assertTrue(pickedDns.contains(dns.get(3)));
}

private boolean checkDuplicateNodesUUID(List<DatanodeDetails> nodes) {
HashSet<UUID> uuids = nodes.stream().
map(DatanodeDetails::getUuid).
Expand Down