Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
private final int heavyNodeCriteria;
private static final int REQUIRED_RACKS = 2;

public static final String MULTIPLE_RACK_PIPELINE_MSG =
"The cluster has multiple racks, but all nodes with available " +
"pipeline capacity are on a single rack. There are insufficient " +
"cross rack nodes available to create a pipeline";

/**
* Constructs a pipeline placement with considering network topology,
* load balancing and rack awareness.
Expand Down Expand Up @@ -120,6 +125,7 @@ List<DatanodeDetails> filterViableNodes(
// get nodes in HEALTHY state
List<DatanodeDetails> healthyNodes =
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
boolean multipleRacks = multipleRacksAvailable(healthyNodes);
if (excludedNodes != null) {
healthyNodes.removeAll(excludedNodes);
}
Expand Down Expand Up @@ -163,9 +169,38 @@ List<DatanodeDetails> filterViableNodes(
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}

if (!checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
boolean multipleRacksAfterFilter = multipleRacksAvailable(healthyList);
if (multipleRacks && !multipleRacksAfterFilter) {
LOG.debug(MULTIPLE_RACK_PIPELINE_MSG);
throw new SCMException(MULTIPLE_RACK_PIPELINE_MSG,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
}
return healthyList;
}

/**
* Given a list of Datanodes, return false if the entire list is only on a
* single rack, or the list is empty. If there is more than 1 rack, return
* true.
* @param dns List of datanodes to check
* @return True if there are multiple racks, false otherwise
*/
private boolean multipleRacksAvailable(List<DatanodeDetails> dns) {
if (dns.size() <= 1) {
return false;
}
String initialRack = dns.get(0).getNetworkLocation();
for (DatanodeDetails dn : dns) {
if (!dn.getNetworkLocation().equals(initialRack)) {
return true;
}
}
return false;
}

/**
* Pipeline placement choose datanodes to join the pipeline.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,19 @@ public void clearCommandQueue(UUID dnId) {
}
}

public void setNodeState(DatanodeDetails dn, HddsProtos.NodeState state) {
healthyNodes.remove(dn);
staleNodes.remove(dn);
deadNodes.remove(dn);
if (state == HEALTHY) {
healthyNodes.add(dn);
} else if (state == STALE) {
staleNodes.add(dn);
} else {
deadNodes.add(dn);
}
}

/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@

import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -439,6 +441,85 @@ public void testValidatePlacementPolicySingleRackInCluster() {
assertEquals(0, status.misReplicationCount());
}

@Test
public void test3NodesInSameRackReturnedWhenOnlyOneHealthyRackIsPresent()
throws Exception {
List<DatanodeDetails> dns = setupSkewedRacks();

int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
// Set the only node on rack1 stale. This makes the cluster effectively a
// single rack.
nodeManager.setNodeState(dns.get(0), HddsProtos.NodeState.STALE);

// As there is only 1 rack alive, the 3 DNs on /rack2 should be returned
List<DatanodeDetails> pickedDns = placementPolicy.chooseDatanodes(
new ArrayList<>(), new ArrayList<>(), nodesRequired, 0);

assertEquals(3, pickedDns.size());
assertTrue(pickedDns.contains(dns.get(1)));
assertTrue(pickedDns.contains(dns.get(2)));
assertTrue(pickedDns.contains(dns.get(3)));
}

@Rule
public ExpectedException thrownExp = ExpectedException.none();

@Test
public void testExceptionIsThrownWhenRackAwarePipelineCanNotBeCreated()
throws Exception {
thrownExp.expect(SCMException.class);
thrownExp.expectMessage(PipelinePlacementPolicy.MULTIPLE_RACK_PIPELINE_MSG);

List<DatanodeDetails> dns = setupSkewedRacks();

// Set the first node to its pipeline limit. This means there are only
// 3 hosts on a single rack available for new pipelines
insertHeavyNodesIntoNodeManager(dns, 1);
int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();

placementPolicy.chooseDatanodes(
new ArrayList<>(), new ArrayList<>(), nodesRequired, 0);
}

@Test
public void testExceptionThrownRackAwarePipelineCanNotBeCreatedExcludedNode()
throws Exception {
thrownExp.expect(SCMException.class);
thrownExp.expectMessage(PipelinePlacementPolicy.MULTIPLE_RACK_PIPELINE_MSG);

List<DatanodeDetails> dns = setupSkewedRacks();

// Set the first node to its pipeline limit. This means there are only
// 3 hosts on a single rack available for new pipelines
insertHeavyNodesIntoNodeManager(dns, 1);
int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();

List<DatanodeDetails> excluded = new ArrayList<>();
excluded.add(dns.get(0));
placementPolicy.chooseDatanodes(
excluded, new ArrayList<>(), nodesRequired, 0);
}

private List<DatanodeDetails> setupSkewedRacks() {
cluster = initTopology();

List<DatanodeDetails> dns = new ArrayList<>();
dns.add(MockDatanodeDetails
.createDatanodeDetails("host1", "/rack1"));
dns.add(MockDatanodeDetails
.createDatanodeDetails("host2", "/rack2"));
dns.add(MockDatanodeDetails
.createDatanodeDetails("host3", "/rack2"));
dns.add(MockDatanodeDetails
.createDatanodeDetails("host4", "/rack2"));

nodeManager = new MockNodeManager(cluster, dns,
false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
placementPolicy = new PipelinePlacementPolicy(
nodeManager, stateManager, conf);
return dns;
}

private boolean checkDuplicateNodesUUID(List<DatanodeDetails> nodes) {
HashSet<UUID> uuids = nodes.stream().
map(DatanodeDetails::getUuid).
Expand Down