Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -59,8 +61,6 @@ public final class Pipeline {
private UUID leaderId;
// Timestamp for pipeline upon creation
private Instant creationTimestamp;
// Only valid for Ratis THREE pipeline. No need persist.
private int nodeIdsHash;

/**
* The immutable properties of pipeline object is used in
Expand All @@ -76,7 +76,6 @@ private Pipeline(PipelineID id, ReplicationType type,
this.state = state;
this.nodeStatus = nodeStatus;
this.creationTimestamp = Instant.now();
this.nodeIdsHash = 0;
}

/**
Expand Down Expand Up @@ -133,14 +132,6 @@ void setCreationTimestamp(Instant creationTimestamp) {
this.creationTimestamp = creationTimestamp;
}

public int getNodeIdsHash() {
return nodeIdsHash;
}

void setNodeIdsHash(int nodeIdsHash) {
this.nodeIdsHash = nodeIdsHash;
}

/**
* Return the pipeline leader's UUID.
*
Expand All @@ -166,6 +157,23 @@ public List<DatanodeDetails> getNodes() {
return new ArrayList<>(nodeStatus.keySet());
}

/**
* Return an immutable set of nodes which form this pipeline.
* @return Set of DatanodeDetails
*/
public Set<DatanodeDetails> getNodeSet() {
return Collections.unmodifiableSet(nodeStatus.keySet());
}

/**
* Check if the input pipeline share the same set of datanodes.
* @param pipeline
* @return true if the input pipeline shares the same set of datanodes.
*/
public boolean sameDatanodes(Pipeline pipeline) {
return getNodeSet().equals(pipeline.getNodeSet());
}

/**
* Returns the leader if found else defaults to closest node.
*
Expand Down Expand Up @@ -360,7 +368,6 @@ public static class Builder {
private List<DatanodeDetails> nodesInOrder = null;
private UUID leaderId = null;
private Instant creationTimestamp = null;
private int nodeIdsHash = 0;

public Builder() {}

Expand All @@ -373,7 +380,6 @@ public Builder(Pipeline pipeline) {
this.nodesInOrder = pipeline.nodesInOrder.get();
this.leaderId = pipeline.getLeaderId();
this.creationTimestamp = pipeline.getCreationTimestamp();
this.nodeIdsHash = 0;
}

public Builder setId(PipelineID id1) {
Expand Down Expand Up @@ -417,11 +423,6 @@ public Builder setCreateTimestamp(long createTimestamp) {
return this;
}

public Builder setNodeIdsHash(int nodeIdsHash1) {
this.nodeIdsHash = nodeIdsHash1;
return this;
}

public Pipeline build() {
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(type);
Expand All @@ -430,7 +431,6 @@ public Pipeline build() {
Preconditions.checkNotNull(nodeStatus);
Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
pipeline.setLeaderId(leaderId);
pipeline.setNodeIdsHash(nodeIdsHash);
// overwrite with original creationTimestamp
if (creationTimestamp != null) {
pipeline.setCreationTimestamp(creationTimestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,10 @@ List<DatanodeDetails> filterViableNodes(
String msg;

if (initialHealthyNodesCount < nodesRequired) {
LOG.warn("Not enough healthy nodes to allocate pipeline." +
nodesRequired + " datanodes required. Found: " +
initialHealthyNodesCount);
msg = String.format("Pipeline creation failed due to no sufficient" +
" healthy datanodes. Required %d. Found %d.",
nodesRequired, initialHealthyNodesCount);
LOG.warn(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
Expand Down Expand Up @@ -229,42 +227,49 @@ public List<DatanodeDetails> getResultSet(
// First choose an anchor nodes randomly
DatanodeDetails anchor = chooseNode(healthyNodes);
if (anchor == null) {
LOG.warn("Unable to find healthy nodes." +
LOG.warn("Unable to find healthy node for anchor(first) node." +
" Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
if (LOG.isDebugEnabled()) {
LOG.debug("First node chosen: {}", anchor);
}

results.add(anchor);
exclude.add(anchor);
nodesRequired--;

// Choose the second node on different racks from anchor.
DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
healthyNodes, exclude,
nodeManager.getClusterNetworkTopologyMap(), anchor);
if (nodeOnDifferentRack == null) {
LOG.warn("Pipeline Placement: Unable to find nodes on different racks " +
" that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
LOG.warn("Pipeline Placement: Unable to find 2nd node on different " +
"racks that meets the criteria. Required nodes: {}, Found nodes:" +
" {}", nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Second node chosen: {}", nodeOnDifferentRack);
}

results.add(nodeOnDifferentRack);
exclude.add(nodeOnDifferentRack);
nodesRequired--;

// Then choose nodes close to anchor based on network topology
for (int x = 0; x < nodesRequired; x++) {
int nodesToFind = nodesRequired - results.size();
for (int x = 0; x < nodesToFind; x++) {
// invoke the choose function defined in the derived classes.
DatanodeDetails pick = chooseNodeFromNetworkTopology(
nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
if (pick != null) {
results.add(pick);
// exclude the picked node for next time
exclude.add(pick);
if (LOG.isDebugEnabled()) {
LOG.debug("Remaining node chosen: {}", pick);
}
}
}

Expand Down Expand Up @@ -306,9 +311,7 @@ public DatanodeDetails chooseNode(
datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get())
? firstNodeDetails : secondNodeDetails;
}
// the pick is decided and it should be removed from candidates.
healthyNodes.remove(datanodeDetails);

return datanodeDetails;
}

Expand All @@ -331,12 +334,10 @@ protected DatanodeDetails chooseNodeBasedOnRackAwareness(
}

for (DatanodeDetails node : healthyNodes) {
if (excludedNodes.contains(node)
|| networkTopology.isSameParent(anchor, node)) {
if (excludedNodes.contains(node) ||
anchor.getNetworkLocation().equals(node.getNetworkLocation())) {
continue;
} else {
// the pick is decided and it should be removed from candidates.
healthyNodes.remove(node);
return node;
}
}
Expand Down Expand Up @@ -374,15 +375,10 @@ protected DatanodeDetails chooseNodeFromNetworkTopology(
if (excludedNodes != null && excludedNodes.size() != 0) {
excluded.addAll(excludedNodes);
}
excluded.add(anchor);

Node pick = networkTopology.chooseRandom(
anchor.getNetworkLocation(), excluded);
DatanodeDetails pickedNode = (DatanodeDetails) pick;
// exclude the picked node for next time
if (excludedNodes != null) {
excludedNodes.add(pickedNode);
}
return pickedNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,6 @@ Pipeline openPipeline(PipelineID pipelineId) throws IOException {
pipeline = pipelineStateMap
.updatePipelineState(pipelineId, PipelineState.OPEN);
}
// Amend nodeIdsHash if needed.
if (pipeline.getType() == ReplicationType.RATIS &&
pipeline.getFactor() == ReplicationFactor.THREE &&
pipeline.getNodeIdsHash() == 0) {
pipeline.setNodeIdsHash(RatisPipelineUtils
.encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes()));
}
return pipeline;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
}

List<DatanodeDetails> dns;
int nodeIdHash = 0;

switch(factor) {
case ONE:
Expand All @@ -166,7 +165,6 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
case THREE:
dns = placementPolicy.chooseDatanodes(null,
null, factor.getNumber(), 0);
nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(dns);
break;
default:
throw new IllegalStateException("Unknown factor: " + factor.name());
Expand All @@ -178,7 +176,6 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(dns)
.setNodeIdsHash(nodeIdHash)
.build();

// Send command to datanodes to create pipeline
Expand All @@ -199,17 +196,12 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
@Override
public Pipeline create(ReplicationFactor factor,
List<DatanodeDetails> nodes) {
int nodeIdHash = 0;
if (factor == ReplicationFactor.THREE) {
nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes);
}
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(PipelineState.ALLOCATED)
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(nodes)
.setNodeIdsHash(nodeIdHash)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,31 +103,21 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
}
}

static int encodeNodeIdsOfFactorThreePipeline(List<DatanodeDetails> nodes) {
if (nodes.size() != HddsProtos.ReplicationFactor.THREE.getNumber()) {
return 0;
}
return nodes.get(0).getUuid().hashCode() ^
nodes.get(1).getUuid().hashCode() ^
nodes.get(2).getUuid().hashCode();
}

/**
* Return the list of pipelines who share the same set of datanodes
* with the input pipeline.
* @param stateManager PipelineStateManager
* @param pipeline input pipeline
* @return first matched pipeline
* @return list of matched pipeline
*/
static List<Pipeline> checkPipelineContainSameDatanodes(
PipelineStateManager stateManager, Pipeline pipeline) {
return stateManager.getPipelines(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE)
.stream().filter(p -> !p.getId().equals(pipeline.getId()) &&
(// For all OPEN or ALLOCATED pipelines
p.getPipelineState() != Pipeline.PipelineState.CLOSED &&
p.getNodeIdsHash() == pipeline.getNodeIdsHash()))
(p.getPipelineState() != Pipeline.PipelineState.CLOSED &&
p.sameDatanodes(pipeline)))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ private void initializePipelineState() throws IOException {
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
Preconditions.checkNotNull(pipeline);
pipeline.setNodeIdsHash(RatisPipelineUtils.
encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes()));
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ public Pipeline create(HddsProtos.ReplicationFactor factor)
.setType(initialPipeline.getType())
.setFactor(factor)
.setNodes(initialPipeline.getNodes())
.setNodeIdsHash(RatisPipelineUtils
.encodeNodeIdsOfFactorThreePipeline(initialPipeline.getNodes()))
.build();
}
}
Expand All @@ -93,8 +91,6 @@ public Pipeline create(HddsProtos.ReplicationFactor factor,
.setType(HddsProtos.ReplicationType.RATIS)
.setFactor(factor)
.setNodes(nodes)
.setNodeIdsHash(RatisPipelineUtils
.encodeNodeIdsOfFactorThreePipeline(nodes))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ public void testChooseNodeBasedOnNetworkTopology() {

List<DatanodeDetails> excludedNodes =
new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT);
excludedNodes.add(anchor);
DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
// excludedNodes should contain nextNode after being chosen.
Assert.assertTrue(excludedNodes.contains(nextNode));
Assert.assertFalse(excludedNodes.contains(nextNode));
// nextNode should not be the same as anchor.
Assert.assertTrue(anchor.getUuid() != nextNode.getUuid());
}
Expand All @@ -83,7 +83,8 @@ public void testChooseNodeBasedOnRackAwareness() {
DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
topologyWithDifRacks, anchor);
Assert.assertFalse(topologyWithDifRacks.isSameParent(anchor, nextNode));
Assert.assertFalse(anchor.getNetworkLocation().equals(
nextNode.getNetworkLocation()));
}

private final static Node[] NODES = new NodeImpl[] {
Expand Down
Loading