Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,29 @@ public List<DatanodeDetails> chooseDatanodes(
}
}

// Fall back logic for node pick up.
DatanodeDetails fallBackPickNodes(
List<DatanodeDetails> nodeSet, List<DatanodeDetails> excludedNodes)
throws SCMException{
DatanodeDetails node;
if (excludedNodes == null || excludedNodes.isEmpty()) {
node = chooseNode(nodeSet);
} else {
List<DatanodeDetails> inputNodes = nodeSet.stream()
.filter(p -> !excludedNodes.contains(p)).collect(Collectors.toList());
node = chooseNode(inputNodes);
}

if (node == null) {
String msg = String.format("Unable to find fall back node in" +
" pipeline allocation. nodeSet size: {}", nodeSet.size());
LOG.warn(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
return node;
}

/**
* Get result set based on the pipeline placement algorithm which considers
* network topology and rack awareness.
Expand All @@ -220,50 +243,59 @@ public List<DatanodeDetails> chooseDatanodes(
public List<DatanodeDetails> getResultSet(
int nodesRequired, List<DatanodeDetails> healthyNodes)
throws SCMException {
if (nodesRequired != HddsProtos.ReplicationFactor.THREE.getNumber()) {
throw new SCMException("Nodes required number is not supported: " +
nodesRequired, SCMException.ResultCodes.INVALID_CAPACITY);
}

// Assume rack awareness is not enabled.
boolean rackAwareness = false;
List <DatanodeDetails> results = new ArrayList<>(nodesRequired);
// Since nodes are widely distributed, the results should be selected
// base on distance in topology, rack awareness and load balancing.
List<DatanodeDetails> exclude = new ArrayList<>();
// First choose an anchor nodes randomly
DatanodeDetails anchor = chooseNode(healthyNodes);
if (anchor == null) {
LOG.warn("Unable to find healthy node for anchor(first) node." +
" Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
if (anchor != null) {
results.add(anchor);
exclude.add(anchor);
} else {
LOG.warn("Unable to find healthy node for anchor(first) node.");
throw new SCMException("Unable to find anchor node.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
if (LOG.isDebugEnabled()) {
LOG.debug("First node chosen: {}", anchor);
}

results.add(anchor);
exclude.add(anchor);

// Choose the second node on different racks from anchor.
DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
DatanodeDetails nextNode = chooseNodeBasedOnRackAwareness(
healthyNodes, exclude,
nodeManager.getClusterNetworkTopologyMap(), anchor);
if (nodeOnDifferentRack == null) {
LOG.warn("Pipeline Placement: Unable to find 2nd node on different " +
"racks that meets the criteria. Required nodes: {}, Found nodes:" +
" {}", nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Second node chosen: {}", nodeOnDifferentRack);
if (nextNode != null) {
// Rack awareness is detected.
rackAwareness = true;
results.add(nextNode);
exclude.add(nextNode);
if (LOG.isDebugEnabled()) {
LOG.debug("Second node chosen: {}", nextNode);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Pipeline Placement: Unable to find 2nd node on different " +
"rack based on rack awareness.");
}
}

results.add(nodeOnDifferentRack);
exclude.add(nodeOnDifferentRack);

// Then choose nodes close to anchor based on network topology
int nodesToFind = nodesRequired - results.size();
for (int x = 0; x < nodesToFind; x++) {
// invoke the choose function defined in the derived classes.
DatanodeDetails pick = chooseNodeFromNetworkTopology(
nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
// Pick remaining nodes based on the existence of rack awareness.
DatanodeDetails pick = rackAwareness
? chooseNodeFromNetworkTopology(
nodeManager.getClusterNetworkTopologyMap(), anchor, exclude)
: fallBackPickNodes(healthyNodes, exclude);
if (pick != null) {
results.add(pick);
exclude.add(pick);
Expand Down Expand Up @@ -293,6 +325,9 @@ public List<DatanodeDetails> getResultSet(
@Override
public DatanodeDetails chooseNode(
List<DatanodeDetails> healthyNodes) {
if (healthyNodes == null || healthyNodes.isEmpty()) {
return null;
}
int firstNodeNdx = getRand().nextInt(healthyNodes.size());
int secondNodeNdx = getRand().nextInt(healthyNodes.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,66 @@ public void testChooseNodeBasedOnRackAwareness() {
DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
topologyWithDifRacks, anchor);
Assert.assertNotNull(nextNode);
Assert.assertFalse(anchor.getNetworkLocation().equals(
nextNode.getNetworkLocation()));
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test simple test case with topology that all nodes on the same rack with topology aware and fall back enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a test. Please check the latest commit.

public void testFallBackPickNodes() {
List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));
DatanodeDetails node;
try {
node = placementPolicy.fallBackPickNodes(healthyNodes, null);
Assert.assertNotNull(node);
} catch (SCMException e) {
Assert.fail("Should not reach here.");
}

// when input nodeSet are all excluded.
List<DatanodeDetails> exclude = healthyNodes;
try {
node = placementPolicy.fallBackPickNodes(healthyNodes, exclude);
Assert.assertNull(node);
} catch (SCMException e) {
Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
e.getResult());
} catch (Exception ex) {
Assert.fail("Should not reach here.");
}
}

@Test
public void testRackAwarenessNotEnabledWithFallBack() throws SCMException{
List<DatanodeDetails> healthyNodes =
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
DatanodeDetails randomNode = placementPolicy.chooseNode(healthyNodes);
// rack awareness is not enabled.
Assert.assertTrue(anchor.getNetworkLocation().equals(
randomNode.getNetworkLocation()));

NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
topology, anchor);
// RackAwareness should not be able to choose any node.
Assert.assertNull(nextNode);

// PlacementPolicy should still be able to pick a set of 3 nodes.
int numOfNodes = HddsProtos.ReplicationFactor.THREE.getNumber();
List<DatanodeDetails> results = placementPolicy
.getResultSet(numOfNodes, healthyNodes);

Assert.assertEquals(numOfNodes, results.size());
// All nodes are on same rack.
Assert.assertEquals(results.get(0).getNetworkLocation(),
results.get(1).getNetworkLocation());
Assert.assertEquals(results.get(0).getNetworkLocation(),
results.get(2).getNetworkLocation());
}

private final static Node[] NODES = new NodeImpl[] {
new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),
Expand Down