Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,18 +321,18 @@ public final class ScmConfigKeys {
"ozone.scm.pipeline.owner.container.count";
public static final int OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT = 3;
// Pipeline placement policy:
// the max number of pipelines can a single datanode be engaged in.
public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
"ozone.scm.datanode.max.pipeline.engagement";
// Setting to zero by default means this limit doesn't take effect.
public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 2;
// Upper limit for how many pipelines a datanode can engage in.
public static final String OZONE_DATANODE_PIPELINE_LIMIT =
"ozone.datanode.pipeline.limit";
public static final int OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT = 2;

// Upper limit for how many pipelines can be created.
// Upper limit for how many pipelines can be created
// across the cluster nodes managed by SCM.
// Only for test purpose now.
public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT =
"ozone.scm.pipeline.number.limit";
public static final String OZONE_SCM_RATIS_PIPELINE_LIMIT =
"ozone.scm.ratis.pipeline.limit";
// Setting to zero by default means this limit doesn't take effect.
public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0;
public static final int OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT = 0;

public static final String
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.pipeline;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -57,7 +58,7 @@ public final class Pipeline {
// Current reported Leader for the pipeline
private UUID leaderId;
// Timestamp for pipeline upon creation
private Long creationTimestamp;
private Instant creationTimestamp;
// Only valid for Ratis THREE pipeline. No need persist.
private int nodeIdsHash;

Expand All @@ -74,7 +75,7 @@ private Pipeline(PipelineID id, ReplicationType type,
this.factor = factor;
this.state = state;
this.nodeStatus = nodeStatus;
this.creationTimestamp = System.currentTimeMillis();
this.creationTimestamp = Instant.now();
this.nodeIdsHash = 0;
}

Expand Down Expand Up @@ -119,7 +120,7 @@ public PipelineState getPipelineState() {
*
* @return Creation Timestamp
*/
public Long getCreationTimestamp() {
public Instant getCreationTimestamp() {
return creationTimestamp;
}

Expand All @@ -128,7 +129,7 @@ public Long getCreationTimestamp() {
*
* @param creationTimestamp
*/
void setCreationTimestamp(Long creationTimestamp) {
void setCreationTimestamp(Instant creationTimestamp) {
this.creationTimestamp = creationTimestamp;
}

Expand Down Expand Up @@ -253,7 +254,7 @@ public HddsProtos.Pipeline getProtobufMessage()
.setFactor(factor)
.setState(PipelineState.getProtobuf(state))
.setLeaderID(leaderId != null ? leaderId.toString() : "")
.setCreationTimeStamp(creationTimestamp)
.setCreationTimeStamp(creationTimestamp.toEpochMilli())
.addAllMembers(nodeStatus.keySet().stream()
.map(DatanodeDetails::getProtoBufMessage)
.collect(Collectors.toList()));
Expand Down Expand Up @@ -289,6 +290,7 @@ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
.setNodes(pipeline.getMembersList().stream()
.map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList()))
.setNodesInOrder(pipeline.getMemberOrdersList())
.setCreateTimestamp(pipeline.getCreationTimeStamp())
.build();
}

Expand Down Expand Up @@ -357,7 +359,7 @@ public static class Builder {
private List<Integer> nodeOrder = null;
private List<DatanodeDetails> nodesInOrder = null;
private UUID leaderId = null;
private Long creationTimestamp = null;
private Instant creationTimestamp = null;
private int nodeIdsHash = 0;

public Builder() {}
Expand Down Expand Up @@ -410,6 +412,11 @@ public Builder setNodesInOrder(List<Integer> orders) {
return this;
}

public Builder setCreateTimestamp(long createTimestamp) {
this.creationTimestamp = Instant.ofEpochMilli(createTimestamp);
return this;
}

public Builder setNodeIdsHash(int nodeIdsHash1) {
this.nodeIdsHash = nodeIdsHash1;
return this;
Expand Down
9 changes: 5 additions & 4 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -840,14 +840,14 @@
</description>
</property>
<property>
<name>ozone.scm.datanode.max.pipeline.engagement</name>
<name>ozone.datanode.pipeline.limit</name>
<value>2</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>Max number of pipelines per datanode can be engaged in.
</description>
</property>
<property>
<name>ozone.scm.pipeline.number.limit</name>
<name>ozone.scm.ratis.pipeline.limit</name>
<value>0</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>Upper limit for how many pipelines can be OPEN in SCM.
Expand All @@ -862,8 +862,9 @@
<description>
Timeout for every pipeline to stay in ALLOCATED stage. When pipeline is created,
it should be at OPEN stage once pipeline report is successfully received by SCM.
If a pipeline stays at ALLOCATED for too long, it should be scrubbed so that new
pipeline can be created. This timeout is for how long pipeline can stay at ALLOCATED
If a pipeline stays at ALLOCATED longer than the specified period of time,
it should be scrubbed so that new pipeline can be created.
This timeout is for how long pipeline can stay at ALLOCATED
stage until it gets scrubbed.
</description>
</property>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public Set<PipelineID> getPipelines(UUID datanode) {
* @return Number of pipelines or 0.
*/
public int getPipelinesCount(UUID datanode) {
Set<PipelineID> pipelines = getObjects(datanode);
return pipelines == null ? 0 : pipelines.size();
return getObjects(datanode).size();
}

/**
Expand All @@ -80,7 +79,7 @@ public synchronized void removePipeline(Pipeline pipeline) {
dn2ObjectMap.computeIfPresent(dnId,
(k, v) -> {
v.remove(pipeline.getId());
return v.isEmpty() ? null : v;
return v;
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public PipelinePlacementPolicy(final NodeManager nodeManager,
this.conf = conf;
this.stateManager = stateManager;
this.heavyNodeCriteria = conf.getInt(
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
}

/**
Expand Down Expand Up @@ -113,7 +113,7 @@ boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
}
boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
- pipelineNumDeductable) < heavyNodeCriteria;
if (!meet) {
if (!meet && LOG.isDebugEnabled()) {
LOG.debug("Pipeline Placement: can't place more pipeline on heavy " +
"datanode: " + datanodeDetails.getUuid().toString() +
" Heaviness: " + nodeManager.getPipelinesCount(datanodeDetails) +
Expand Down Expand Up @@ -143,17 +143,11 @@ List<DatanodeDetails> filterViableNodes(
}
int initialHealthyNodesCount = healthyNodes.size();
String msg;
if (initialHealthyNodesCount == 0) {
msg = "No healthy nodes found to allocate pipeline.";
LOG.error(msg);
throw new SCMException(msg, SCMException.ResultCodes
.FAILED_TO_FIND_SUITABLE_NODE);
}

if (initialHealthyNodesCount < nodesRequired) {
LOG.warn("Not enough healthy nodes to allocate pipeline. %d "
+ " datanodes required. Found %d",
nodesRequired, initialHealthyNodesCount);
LOG.warn("Not enough healthy nodes to allocate pipeline." +
nodesRequired + " datanodes required. Found: " +
initialHealthyNodesCount);
msg = String.format("Pipeline creation failed due to no sufficient" +
" healthy datanodes. Required %d. Found %d.",
nodesRequired, initialHealthyNodesCount);
Expand All @@ -168,15 +162,17 @@ List<DatanodeDetails> filterViableNodes(
.collect(Collectors.toList());

if (healthyList.size() < nodesRequired) {
LOG.debug("Unable to find enough nodes that meet " +
"the criteria that cannot engage in more than %d pipelines." +
" Nodes required: %d Found: %d, healthy nodes count in " +
"NodeManager: %d.",
heavyNodeCriteria, nodesRequired, healthyList.size(),
initialHealthyNodesCount);
msg = String.format("Pipeline creation failed due to not enough" +
" healthy datanodes after filter. Required %d. Found %d",
nodesRequired, initialHealthyNodesCount);
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to find enough nodes that meet the criteria that" +
" cannot engage in more than" + heavyNodeCriteria +
" pipelines. Nodes required: " + nodesRequired + " Found:" +
healthyList.size() + " healthy nodes count in NodeManager: " +
initialHealthyNodesCount);
}
msg = String.format("Pipeline creation failed because nodes are engaged" +
" in other pipelines and every node can only be engaged in" +
" max %d pipelines. Required %d. Found %d",
heavyNodeCriteria, nodesRequired, healthyList.size());
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ public class RatisPipelineProvider implements PipelineProvider {
this.placementPolicy =
new PipelinePlacementPolicy(nodeManager, stateManager, conf);
this.pipelineNumberLimit = conf.getInt(
ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT);
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT_DEFAULT);
this.maxPipelinePerDatanode = conf.getInt(
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
}

private List<DatanodeDetails> pickNodesNeverUsed(ReplicationFactor factor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,21 @@ static int encodeNodeIdsOfFactorThreePipeline(List<DatanodeDetails> nodes) {
}

/**
* Return first existed pipeline which share the same set of datanodes
* Return the list of pipelines who share the same set of datanodes
* with the input pipeline.
* @param stateManager PipelineStateManager
* @param pipeline input pipeline
* @return first matched pipeline
*/
static Pipeline checkPipelineContainSameDatanodes(
static List<Pipeline> checkPipelineContainSameDatanodes(
PipelineStateManager stateManager, Pipeline pipeline) {
List<Pipeline> matchedPipelines = stateManager.getPipelines(
return stateManager.getPipelines(
HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE)
.stream().filter(p -> !p.getId().equals(pipeline.getId()) &&
(// For all OPEN or ALLOCATED pipelines
p.getPipelineState() == Pipeline.PipelineState.OPEN ||
p.getPipelineState() == Pipeline.PipelineState.ALLOCATED) &&
p.getNodeIdsHash() == pipeline.getNodeIdsHash())
p.getPipelineState() != Pipeline.PipelineState.CLOSED &&
p.getNodeIdsHash() == pipeline.getNodeIdsHash()))
.collect(Collectors.toList());
if (matchedPipelines.size() == 0) {
return null;
} else {
return matchedPipelines.stream().findFirst().get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,19 +130,6 @@ public void setPipelineProvider(ReplicationType replicationType,
pipelineFactory.setProvider(replicationType, provider);
}

private int computeNodeIdHash(Pipeline pipeline) {
if (pipeline.getType() != ReplicationType.RATIS) {
return 0;
}

if (pipeline.getFactor() != ReplicationFactor.THREE) {
return 0;
}

return RatisPipelineUtils.
encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes());
}

private void initializePipelineState() throws IOException {
if (pipelineStore.isEmpty()) {
LOG.info("No pipeline exists in current db");
Expand All @@ -156,7 +145,8 @@ private void initializePipelineState() throws IOException {
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
Preconditions.checkNotNull(pipeline);
pipeline.setNodeIdsHash(computeNodeIdHash(pipeline));
pipeline.setNodeIdsHash(RatisPipelineUtils.
encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes()));
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
}
Expand All @@ -177,17 +167,20 @@ public synchronized Pipeline createPipeline(ReplicationType type,
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
}
Pipeline overlapPipeline = RatisPipelineUtils
List<Pipeline> overlapPipelines = RatisPipelineUtils
.checkPipelineContainSameDatanodes(stateManager, pipeline);
if (overlapPipeline != null) {
if (!overlapPipelines.isEmpty()) {
// Count 1 overlap at a time.
metrics.incNumPipelineContainSameDatanodes();
//TODO remove until pipeline allocation is proved equally distributed.
LOG.info("Pipeline: " + pipeline.getId().toString() +
" contains same datanodes as previous pipeline: " +
overlapPipeline.getId().toString() + " nodeIds: " +
pipeline.getNodes().get(0).getUuid().toString() +
", " + pipeline.getNodes().get(1).getUuid().toString() +
", " + pipeline.getNodes().get(2).getUuid().toString());
for (Pipeline overlapPipeline : overlapPipelines) {
LOG.info("Pipeline: " + pipeline.getId().toString() +
" contains same datanodes as previous pipelines: " +
overlapPipeline.getId().toString() + " nodeIds: " +
pipeline.getNodes().get(0).getUuid().toString() +
", " + pipeline.getNodes().get(1).getUuid().toString() +
", " + pipeline.getNodes().get(2).getUuid().toString());
}
}
return pipeline;
} catch (IOException ex) {
Expand Down Expand Up @@ -373,20 +366,21 @@ public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
// Only srub pipeline for RATIS THREE pipeline
return;
}
Long currentTime = System.currentTimeMillis();
Instant currentTime = Instant.now();
Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
Pipeline.PipelineState.ALLOCATED).stream()
.filter(p -> (currentTime - p.getCreationTimestamp()
>= pipelineScrubTimeoutInMills))
.filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp()
.toEpochMilli() >= pipelineScrubTimeoutInMills)
.collect(Collectors.toList());
for (Pipeline p : needToSrubPipelines) {
LOG.info("srubbing pipeline: id: " + p.getId().toString() +
" since it stays at ALLOCATED stage for " +
(currentTime - p.getCreationTimestamp())/60000 + " mins.");
Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
" mins.");
finalizeAndDestroyPipeline(p, false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void setUp() throws Exception {
.getTestDir(TestCloseContainerEventHandler.class.getSimpleName());
configuration
.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
configuration.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT, 16);
configuration.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 16);
nodeManager = new MockNodeManager(true, 10);
eventQueue = new EventQueue();
pipelineManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import org.junit.Test;
import org.mockito.Mockito;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;

/**
* Test DeadNodeHandler.
Expand All @@ -89,7 +89,7 @@ public void setup() throws IOException, AuthenticationException {
storageDir = GenericTestUtils.getTempPath(
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 0);
eventQueue = new EventQueue();
scm = HddsTestUtils.getScm(conf);
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
Expand Down
Loading