ozone.scm.container.size
5GB
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
similarity index 90%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
index 77cdd83f7938..25457f72bc8c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
@@ -15,7 +15,7 @@
* the License.
*/
-package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+package org.apache.hadoop.hdds.scm;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
@@ -33,25 +33,25 @@
import java.util.stream.Collectors;
/**
- * SCM CommonPolicy implements a set of invariants which are common
- * for all container placement policies, acts as the repository of helper
+ * This policy implements a set of invariants which are common
+ * for all basic placement policies, acts as the repository of helper
* functions which are common to placement policies.
*/
-public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
+public abstract class SCMCommonPlacementPolicy implements PlacementPolicy {
@VisibleForTesting
static final Logger LOG =
- LoggerFactory.getLogger(SCMCommonPolicy.class);
+ LoggerFactory.getLogger(SCMCommonPlacementPolicy.class);
private final NodeManager nodeManager;
private final Random rand;
private final Configuration conf;
/**
- * Constructs SCM Common Policy Class.
+ * Constructor.
*
* @param nodeManager NodeManager
* @param conf Configuration class.
*/
- public SCMCommonPolicy(NodeManager nodeManager, Configuration conf) {
+ public SCMCommonPlacementPolicy(NodeManager nodeManager, Configuration conf) {
this.nodeManager = nodeManager;
this.rand = new Random();
this.conf = conf;
@@ -85,7 +85,7 @@ public Configuration getConf() {
}
/**
- * Given the replication factor and size required, return set of datanodes
+ * Given size required, return set of datanodes
* that satisfy the nodes and size requirement.
*
* Here are some invariants of container placement.
@@ -149,7 +149,7 @@ public List chooseDatanodes(
* @param datanodeDetails DatanodeDetails
* @return true if we have enough space.
*/
- boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
+ public boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
long sizeRequired) {
SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails);
return (nodeMetric != null) && (nodeMetric.get() != null)
@@ -164,7 +164,7 @@ boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
* @param nodesRequired - Nodes Required
* @param healthyNodes - List of Nodes in the result set.
* @return List of Datanodes that can be used for placement.
- * @throws SCMException
+ * @throws SCMException SCMException
*/
public List getResultSet(
int nodesRequired, List healthyNodes)
@@ -190,8 +190,7 @@ public List getResultSet(
/**
* Choose a datanode according to the policy, this function is implemented
- * by the actual policy class. For example, PlacementCapacity or
- * PlacementRandom.
+ * by the actual policy class.
*
* @param healthyNodes - Set of healthy nodes we can choose from.
* @return DatanodeDetails
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index a0a722277adf..12302276a17c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -188,6 +188,10 @@ public AllocatedBlock allocateBlock(final long size, ReplicationType type,
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);
+ } catch (SCMException se) {
+ LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
+ "Datanodes may be used up.", type, factor, se);
+ break;
} catch (IOException e) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
"get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 5540d737cb92..2ba88567cd75 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -38,8 +38,9 @@
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.MetricsCollector;
@@ -85,7 +86,7 @@ public class ReplicationManager implements MetricsSource {
* PlacementPolicy which is used to identify where a container
* should be replicated.
*/
- private final ContainerPlacementPolicy containerPlacement;
+ private final PlacementPolicy containerPlacement;
/**
* EventPublisher to fire Replicate and Delete container events.
@@ -131,12 +132,12 @@ public class ReplicationManager implements MetricsSource {
*
* @param conf OzoneConfiguration
* @param containerManager ContainerManager
- * @param containerPlacement ContainerPlacementPolicy
+ * @param containerPlacement PlacementPolicy
* @param eventPublisher EventPublisher
*/
public ReplicationManager(final ReplicationManagerConfiguration conf,
final ContainerManager containerManager,
- final ContainerPlacementPolicy containerPlacement,
+ final PlacementPolicy containerPlacement,
final EventPublisher eventPublisher,
final LockManager lockManager) {
this.containerManager = containerManager;
@@ -474,7 +475,7 @@ private void forceCloseContainer(final ContainerInfo container,
/**
* If the given container is under replicated, identify a new set of
- * datanode(s) to replicate the container using ContainerPlacementPolicy
+ * datanode(s) to replicate the container using PlacementPolicy
* and send replicate container command to the identified datanode(s).
*
* @param container ContainerInfo
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index 18ec2c385b0c..74431f9b05e8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -34,22 +35,23 @@ public final class ContainerPlacementPolicyFactory {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerPlacementPolicyFactory.class);
- private static final Class extends ContainerPlacementPolicy>
+ private static final Class extends PlacementPolicy>
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT =
SCMContainerPlacementRandom.class;
private ContainerPlacementPolicyFactory() {
}
- public static ContainerPlacementPolicy getPolicy(Configuration conf,
- final NodeManager nodeManager, NetworkTopology clusterMap,
- final boolean fallback, SCMContainerPlacementMetrics metrics)
- throws SCMException{
- final Class extends ContainerPlacementPolicy> placementClass = conf
+
+ public static PlacementPolicy getPolicy(
+ Configuration conf, final NodeManager nodeManager,
+ NetworkTopology clusterMap, final boolean fallback,
+ SCMContainerPlacementMetrics metrics) throws SCMException{
+ final Class extends PlacementPolicy> placementClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
- ContainerPlacementPolicy.class);
- Constructor extends ContainerPlacementPolicy> constructor;
+ PlacementPolicy.class);
+ Constructor extends PlacementPolicy> constructor;
try {
constructor = placementClass.getDeclaredConstructor(NodeManager.class,
Configuration.class, NetworkTopology.class, boolean.class,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
index 85d281cf6dc2..19093448b927 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -65,7 +66,8 @@
* little or no work and the cluster will achieve a balanced distribution
* over time.
*/
-public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
+public final class SCMContainerPlacementCapacity
+ extends SCMCommonPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementCapacity.class);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
index 8eccf451c989..8933fe953a7f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
@@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -45,7 +46,8 @@
* recommend to use this if the network topology has more layers.
*
*/
-public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
+public final class SCMContainerPlacementRackAware
+ extends SCMCommonPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementRackAware.class);
@@ -271,8 +273,8 @@ private Node chooseNode(List excludedNodes, Node affinityNode,
throw new SCMException("No satisfied datanode to meet the" +
" excludedNodes and affinityNode constrains.", null);
}
- if (hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
- LOG.debug("Datanode {} is chosen for container. Required size is {}",
+ if (super.hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
+ LOG.debug("Datanode {} is chosen. Required size is {}",
node.toString(), sizeRequired);
metrics.incrDatanodeChooseSuccessCount();
if (isFallbacked) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
index 6b1a5c8c6cb1..ce5d10d4e517 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
@@ -19,6 +19,8 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -37,8 +39,8 @@
* Balancer will need to support containers as a feature before this class
* can be practically used.
*/
-public final class SCMContainerPlacementRandom extends SCMCommonPolicy
- implements ContainerPlacementPolicy {
+public final class SCMContainerPlacementRandom extends SCMCommonPlacementPolicy
+ implements PlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index d8890fb0b920..d638ee9f0912 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -117,6 +118,13 @@ public interface NodeManager extends StorageContainerNodeProtocol,
*/
Set getPipelines(DatanodeDetails datanodeDetails);
+ /**
+ * Get the count of pipelines a datanodes is associated with.
+ * @param datanodeDetails DatanodeDetails
+ * @return The number of pipelines
+ */
+ int getPipelinesCount(DatanodeDetails datanodeDetails);
+
/**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
@@ -199,4 +207,10 @@ void processNodeReport(DatanodeDetails datanodeDetails,
* @return the given datanode, or null if not found
*/
DatanodeDetails getNodeByAddress(String address);
+
+ /**
+ * Get cluster map as in network topology for this node manager.
+ * @return cluster map
+ */
+ NetworkTopology getClusterNetworkTopologyMap();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 954cb0e8ea46..9d2a9f224cd2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -283,6 +283,15 @@ public void addPipeline(Pipeline pipeline) {
node2PipelineMap.addPipeline(pipeline);
}
+ /**
+ * Get the count of pipelines associated to single datanode.
+ * @param datanodeDetails single datanode
+ * @return number of pipelines associated with it
+ */
+ public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+ return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid());
+ }
+
/**
* Get information about the node.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index d3df858e6e6e..d7c6da930aab 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -472,7 +472,6 @@ public Map getNodeInfo() {
return nodeInfo;
}
-
/**
* Get set of pipelines a datanode is part of.
* @param datanodeDetails - datanodeID
@@ -483,6 +482,15 @@ public Set getPipelines(DatanodeDetails datanodeDetails) {
return nodeStateManager.getPipelineByDnID(datanodeDetails.getUuid());
}
+ /**
+ * Get the count of pipelines a datanodes is associated with.
+ * @param datanodeDetails DatanodeDetails
+ * @return The number of pipelines
+ */
+ @Override
+ public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+ return nodeStateManager.getPipelinesCount(datanodeDetails);
+ }
/**
* Add pipeline information in the NodeManager.
@@ -609,6 +617,15 @@ public DatanodeDetails getNodeByAddress(String address) {
return null;
}
+ /**
+ * Get cluster map as in network topology for this node manager.
+ * @return cluster map
+ */
+ @Override
+ public NetworkTopology getClusterNetworkTopologyMap() {
+ return clusterMap;
+ }
+
private String nodeResolve(String hostname) {
List hosts = new ArrayList<>(1);
hosts.add(hostname);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
index 37525b0076e8..57a377d998f4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
@@ -67,6 +67,7 @@ public boolean isKnownDatanode(UUID datanodeID) {
* @param datanodeID -- Datanode UUID
* @param containerIDs - List of ContainerIDs.
*/
+ @VisibleForTesting
public void insertNewDatanode(UUID datanodeID, Set containerIDs)
throws SCMException {
Preconditions.checkNotNull(containerIDs);
@@ -83,7 +84,8 @@ public void insertNewDatanode(UUID datanodeID, Set containerIDs)
*
* @param datanodeID - Datanode ID.
*/
- void removeDatanode(UUID datanodeID) {
+ @VisibleForTesting
+ public void removeDatanode(UUID datanodeID) {
Preconditions.checkNotNull(datanodeID);
dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index f8633f9fcbcd..496b9e7f341e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -42,7 +42,7 @@ public Node2PipelineMap() {
}
/**
- * Returns null if there no pipelines associated with this datanode ID.
+ * Returns null if there are no pipelines associated with this datanode ID.
*
* @param datanode - UUID
* @return Set of pipelines or Null.
@@ -51,6 +51,16 @@ public Set getPipelines(UUID datanode) {
return getObjects(datanode);
}
+ /**
+ * Return 0 if there are no pipelines associated with this datanode ID.
+ * @param datanode - UUID
+ * @return Number of pipelines or 0.
+ */
+ public int getPipelinesCount(UUID datanode) {
+ Set pipelines = getObjects(datanode);
+ return pipelines == null ? 0 : pipelines.size();
+ }
+
/**
* Adds a pipeline entry to a given dataNode in the map.
*
@@ -61,6 +71,10 @@ public synchronized void addPipeline(Pipeline pipeline) {
UUID dnId = details.getUuid();
dn2ObjectMap.computeIfAbsent(dnId, k -> ConcurrentHashMap.newKeySet())
.add(pipeline.getId());
+ dn2ObjectMap.computeIfPresent(dnId, (k, v) -> {
+ v.add(pipeline.getId());
+ return v;
+ });
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 687356648c3c..6952f74ef29e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -96,6 +96,7 @@ private void createPipelines() {
if (scheduler.isClosed()) {
break;
}
+
pipelineManager.createPipeline(type, factor);
} catch (IOException ioe) {
break;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 8d497fa1b03c..8d040f1df3f8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -57,7 +57,7 @@ public void onMessage(PipelineActionsFromDatanode report,
pipelineID = PipelineID.
getFromProtobuf(action.getClosePipeline().getPipelineID());
Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
- LOG.error("Received pipeline action {} for {} from datanode {}. " +
+ LOG.info("Received pipeline action {} for {} from datanode {}. " +
"Reason : {}", action.getAction(), pipeline,
report.getDatanodeDetails(),
action.getClosePipeline().getDetailedReason());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
new file mode 100644
index 000000000000..e41675dace72
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Pipeline placement policy that choose datanodes based on load balancing
+ * and network topology to supply pipeline creation.
+ *
+ * 1. get a list of healthy nodes
+ * 2. filter out nodes that are not too heavily engaged in other pipelines
+ * 3. Choose an anchor node among the viable nodes.
+ * 4. Choose other nodes around the anchor node based on network topology
+ */
+public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
+ @VisibleForTesting
+ static final Logger LOG =
+ LoggerFactory.getLogger(PipelinePlacementPolicy.class);
+ private final NodeManager nodeManager;
+ private final PipelineStateManager stateManager;
+ private final Configuration conf;
+ private final int heavyNodeCriteria;
+
+ /**
+ * Constructs a pipeline placement with considering network topology,
+ * load balancing and rack awareness.
+ *
+ * @param nodeManager NodeManager
+ * @param stateManager PipelineStateManager
+ * @param conf Configuration
+ */
+ public PipelinePlacementPolicy(final NodeManager nodeManager,
+ final PipelineStateManager stateManager, final Configuration conf) {
+ super(nodeManager, conf);
+ this.nodeManager = nodeManager;
+ this.conf = conf;
+ this.stateManager = stateManager;
+ this.heavyNodeCriteria = conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
+ }
+
+ /**
+ * Returns true if this node meets the criteria.
+ *
+ * @param datanodeDetails DatanodeDetails
+ * @param nodesRequired nodes required count
+ * @return true if we have enough space.
+ */
+ @VisibleForTesting
+ boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
+ if (heavyNodeCriteria == 0) {
+ // no limit applied.
+ return true;
+ }
+ // Datanodes from pipeline in some states can also be considered available
+ // for pipeline allocation. Thus the number of these pipeline shall be
+ // deducted from total heaviness calculation.
+ int pipelineNumDeductable = (int)stateManager.getPipelines(
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.valueOf(nodesRequired),
+ Pipeline.PipelineState.CLOSED)
+ .stream().filter(
+ p -> nodeManager.getPipelines(datanodeDetails).contains(p.getId()))
+ .count();
+ boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
+ - pipelineNumDeductable) < heavyNodeCriteria;
+ if (!meet) {
+ LOG.info("Pipeline Placement: can't place more pipeline on heavy " +
+ "datanodeļ¼ " + datanodeDetails.getUuid().toString() + " Heaviness: " +
+ nodeManager.getPipelinesCount(datanodeDetails) + " limit: " +
+ heavyNodeCriteria);
+ }
+ return meet;
+ }
+
+ /**
+ * Filter out viable nodes based on
+ * 1. nodes that are healthy
+ * 2. nodes that are not too heavily engaged in other pipelines
+ *
+ * @param excludedNodes - excluded nodes
+ * @param nodesRequired - number of datanodes required.
+ * @return a list of viable nodes
+ * @throws SCMException when viable nodes are not enough in numbers
+ */
+ List filterViableNodes(
+ List excludedNodes, int nodesRequired)
+ throws SCMException {
+ // get nodes in HEALTHY state
+ List healthyNodes =
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ if (excludedNodes != null) {
+ healthyNodes.removeAll(excludedNodes);
+ }
+ int initialHealthyNodesCount = healthyNodes.size();
+ String msg;
+ if (initialHealthyNodesCount == 0) {
+ msg = "No healthy node found to allocate pipeline.";
+ LOG.error(msg);
+ throw new SCMException(msg, SCMException.ResultCodes
+ .FAILED_TO_FIND_HEALTHY_NODES);
+ }
+
+ if (initialHealthyNodesCount < nodesRequired) {
+ msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
+ + " datanodes required. Found %d",
+ nodesRequired, initialHealthyNodesCount);
+ LOG.error(msg);
+ throw new SCMException(msg,
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+
+ // filter nodes that meet the size and pipeline engagement criteria.
+ // Pipeline placement doesn't take node space left into account.
+ List healthyList = healthyNodes.stream()
+ .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
+ .collect(Collectors.toList());
+
+ if (healthyList.size() < nodesRequired) {
+ msg = String.format("Unable to find enough nodes that meet " +
+ "the criteria that cannot engage in more than %d pipelines." +
+ " Nodes required: %d Found: %d, healthy nodes count in " +
+ "NodeManager: %d.",
+ heavyNodeCriteria, nodesRequired, healthyList.size(),
+ initialHealthyNodesCount);
+ LOG.error(msg);
+ throw new SCMException(msg,
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return healthyList;
+ }
+
+ /**
+ * Pipeline placement choose datanodes to join the pipeline.
+ *
+ * @param excludedNodes - excluded nodes
+ * @param favoredNodes - list of nodes preferred.
+ * @param nodesRequired - number of datanodes required.
+ * @param sizeRequired - size required for the container or block.
+ * @return a list of chosen datanodeDetails
+ * @throws SCMException when chosen nodes are not enough in numbers
+ */
+ @Override
+ public List chooseDatanodes(
+ List excludedNodes, List favoredNodes,
+ int nodesRequired, final long sizeRequired) throws SCMException {
+ // Get a list of viable nodes based on criteria
+ // and make sure excludedNodes are excluded from list.
+ List healthyNodes =
+ filterViableNodes(excludedNodes, nodesRequired);
+
+ // Randomly picks nodes when all nodes are equal or factor is ONE.
+ // This happens when network topology is absent or
+ // all nodes are on the same rack.
+ if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
+ return super.getResultSet(nodesRequired, healthyNodes);
+ } else {
+ // Since topology and rack awareness are available, picks nodes
+ // based on them.
+ return this.getResultSet(nodesRequired, healthyNodes);
+ }
+ }
+
+ /**
+ * Get result set based on the pipeline placement algorithm which considers
+ * network topology and rack awareness.
+ * @param nodesRequired - Nodes Required
+ * @param healthyNodes - List of Nodes in the result set.
+ * @return a list of datanodes
+ * @throws SCMException SCMException
+ */
+ @Override
+ public List getResultSet(
+ int nodesRequired, List healthyNodes)
+ throws SCMException {
+ List results = new ArrayList<>(nodesRequired);
+ // Since nodes are widely distributed, the results should be selected
+ // base on distance in topology, rack awareness and load balancing.
+ List exclude = new ArrayList<>();
+ // First choose an anchor nodes randomly
+ DatanodeDetails anchor = chooseNode(healthyNodes);
+ if (anchor == null) {
+ LOG.error("Pipeline Placement: Unable to find the first healthy nodes " +
+ "that meet the criteria. Required nodes: {}, Found nodes: {}",
+ nodesRequired, results.size());
+ throw new SCMException("Unable to find required number of nodes.",
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+
+ results.add(anchor);
+ exclude.add(anchor);
+ nodesRequired--;
+
+ // Choose the second node on different racks from anchor.
+ DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
+ healthyNodes, exclude,
+ nodeManager.getClusterNetworkTopologyMap(), anchor);
+ if (nodeOnDifferentRack == null) {
+ LOG.error("Pipeline Placement: Unable to find nodes on different racks " +
+ " that meet the criteria. Required nodes: {}, Found nodes: {}",
+ nodesRequired, results.size());
+ throw new SCMException("Unable to find required number of nodes.",
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+
+ results.add(nodeOnDifferentRack);
+ exclude.add(nodeOnDifferentRack);
+ nodesRequired--;
+
+ // Then choose nodes close to anchor based on network topology
+ for (int x = 0; x < nodesRequired; x++) {
+ // invoke the choose function defined in the derived classes.
+ DatanodeDetails pick = chooseNodeFromNetworkTopology(
+ nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
+ if (pick != null) {
+ results.add(pick);
+ // exclude the picked node for next time
+ exclude.add(pick);
+ }
+ }
+
+ if (results.size() < nodesRequired) {
+ LOG.error("Pipeline Placement: Unable to find the required number of " +
+ "healthy nodes that meet the criteria. Required nodes: {}, " +
+ "Found nodes: {}", nodesRequired, results.size());
+ throw new SCMException("Unable to find required number of nodes.",
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return results;
+ }
+
+ /**
+ * Find a node from the healthy list and return it after removing it from the
+ * list that we are operating on.
+ *
+ * @param healthyNodes - Set of healthy nodes we can choose from.
+ * @return chosen datanodDetails
+ */
+ @Override
+ public DatanodeDetails chooseNode(
+ List healthyNodes) {
+ int firstNodeNdx = getRand().nextInt(healthyNodes.size());
+ int secondNodeNdx = getRand().nextInt(healthyNodes.size());
+
+ DatanodeDetails datanodeDetails;
+ // There is a possibility that both numbers will be same.
+ // if that is so, we just return the node.
+ if (firstNodeNdx == secondNodeNdx) {
+ datanodeDetails = healthyNodes.get(firstNodeNdx);
+ } else {
+ DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx);
+ DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx);
+ SCMNodeMetric firstNodeMetric =
+ nodeManager.getNodeStat(firstNodeDetails);
+ SCMNodeMetric secondNodeMetric =
+ nodeManager.getNodeStat(secondNodeDetails);
+ datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get())
+ ? firstNodeDetails : secondNodeDetails;
+ }
+ // the pick is decided and it should be removed from candidates.
+ healthyNodes.remove(datanodeDetails);
+ return datanodeDetails;
+ }
+
+ /**
+ * Choose node on different racks as anchor is on based on rack awareness.
+ * If a node on different racks cannot be found, then return a random node.
+ * @param healthyNodes healthy nodes
+ * @param excludedNodes excluded nodes
+ * @param networkTopology network topology
+ * @param anchor anchor node
+ * @return a node on different rack
+ */
+ @VisibleForTesting
+ protected DatanodeDetails chooseNodeBasedOnRackAwareness(
+ List healthyNodes, List excludedNodes,
+ NetworkTopology networkTopology, DatanodeDetails anchor) {
+ Preconditions.checkArgument(networkTopology != null);
+ if (checkAllNodesAreEqual(networkTopology)) {
+ return null;
+ }
+
+ for (DatanodeDetails node : healthyNodes) {
+ if (excludedNodes.contains(node)
+ || networkTopology.isSameParent(anchor, node)) {
+ continue;
+ } else {
+ // the pick is decided and it should be removed from candidates.
+ healthyNodes.remove(node);
+ return node;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Check if all nodes are equal in topology.
+ * They are equal when network topology is absent or there are on
+ * the same rack.
+ * @param topology network topology
+ * @return true when all nodes are equal
+ */
+ private boolean checkAllNodesAreEqual(NetworkTopology topology) {
+ if (topology == null) {
+ return true;
+ }
+ return (topology.getNumOfNodes(topology.getMaxLevel() - 1) == 1);
+ }
+
+ /**
+ * Choose node based on network topology.
+ * @param networkTopology network topology
+ * @param anchor anchor datanode to start with
+ * @param excludedNodes excluded datanodes
+ * @return chosen datanode
+ */
+ @VisibleForTesting
+ protected DatanodeDetails chooseNodeFromNetworkTopology(
+ NetworkTopology networkTopology, DatanodeDetails anchor,
+ List excludedNodes) {
+ Preconditions.checkArgument(networkTopology != null);
+
+ Collection excluded = new ArrayList<>();
+ if (excludedNodes != null && excludedNodes.size() != 0) {
+ excluded.addAll(excludedNodes);
+ }
+ excluded.add(anchor);
+
+ Node pick = networkTopology.chooseRandom(
+ anchor.getNetworkLocation(), excluded);
+ DatanodeDetails pickedNode = (DatanodeDetails) pick;
+ // exclude the picked node for next time
+ if (excludedNodes != null) {
+ excludedNodes.add(pickedNode);
+ }
+ return pickedNode;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 443378cd1835..8e0f32de1599 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -52,8 +53,8 @@ class PipelineStateMap {
PipelineStateMap() {
// TODO: Use TreeMap for range operations?
- pipelineMap = new HashMap<>();
- pipeline2container = new HashMap<>();
+ pipelineMap = new ConcurrentHashMap<>();
+ pipeline2container = new ConcurrentHashMap<>();
query2OpenPipelines = new HashMap<>();
initializeQueryMap();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index a5e3d37eb92d..f6b80edcf96e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -20,13 +20,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.apache.hadoop.io.MultipleIOException;
@@ -44,13 +43,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
@@ -69,6 +62,7 @@ public class RatisPipelineProvider implements PipelineProvider {
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;
+ private final PipelinePlacementPolicy placementPolicy;
// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private final int parallelismForPool = 3;
@@ -92,65 +86,53 @@ public class RatisPipelineProvider implements PipelineProvider {
this.stateManager = stateManager;
this.conf = conf;
this.tlsConfig = tlsConfig;
+ this.placementPolicy =
+ new PipelinePlacementPolicy(nodeManager, stateManager, conf);
}
-
- /**
- * Create pluggable container placement policy implementation instance.
- *
- * @param nodeManager - SCM node manager.
- * @param conf - configuration.
- * @return SCM container placement policy implementation instance.
- */
- @SuppressWarnings("unchecked")
- // TODO: should we rename ContainerPlacementPolicy to PipelinePlacementPolicy?
- private static ContainerPlacementPolicy createContainerPlacementPolicy(
- final NodeManager nodeManager, final Configuration conf) {
- Class extends ContainerPlacementPolicy> implClass =
- (Class extends ContainerPlacementPolicy>) conf.getClass(
- ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementRandom.class);
-
- try {
- Constructor extends ContainerPlacementPolicy> ctor =
- implClass.getDeclaredConstructor(NodeManager.class,
- Configuration.class);
- return ctor.newInstance(nodeManager, conf);
- } catch (RuntimeException e) {
- throw e;
- } catch (InvocationTargetException e) {
- throw new RuntimeException(implClass.getName()
- + " could not be constructed.", e.getCause());
- } catch (Exception e) {
-// LOG.error("Unhandled exception occurred, Placement policy will not " +
-// "be functional.");
- throw new IllegalArgumentException("Unable to load " +
- "ContainerPlacementPolicy", e);
- }
- }
-
- @Override
- public Pipeline create(ReplicationFactor factor) throws IOException {
- // Get set of datanodes already used for ratis pipeline
+ private List pickNodesNeverUsed(ReplicationFactor factor)
+ throws SCMException {
Set dnsUsed = new HashSet<>();
- stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
- p -> p.getPipelineState().equals(PipelineState.OPEN) ||
- p.getPipelineState().equals(PipelineState.DORMANT) ||
- p.getPipelineState().equals(PipelineState.ALLOCATED))
+ stateManager.getPipelines(ReplicationType.RATIS, factor)
+ .stream().filter(
+ p -> p.getPipelineState().equals(PipelineState.OPEN) ||
+ p.getPipelineState().equals(PipelineState.DORMANT) ||
+ p.getPipelineState().equals(PipelineState.ALLOCATED))
.forEach(p -> dnsUsed.addAll(p.getNodes()));
// Get list of healthy nodes
- List dns =
- nodeManager.getNodes(NodeState.HEALTHY)
- .parallelStream()
- .filter(dn -> !dnsUsed.contains(dn))
- .limit(factor.getNumber())
- .collect(Collectors.toList());
+ List dns = nodeManager
+ .getNodes(HddsProtos.NodeState.HEALTHY)
+ .parallelStream()
+ .filter(dn -> !dnsUsed.contains(dn))
+ .limit(factor.getNumber())
+ .collect(Collectors.toList());
if (dns.size() < factor.getNumber()) {
String e = String
- .format("Cannot create pipeline of factor %d using %d nodes.",
- factor.getNumber(), dns.size());
- throw new InsufficientDatanodesException(e);
+ .format("Cannot create pipeline of factor %d using %d nodes." +
+ " Used %d nodes. Healthy nodes %d", factor.getNumber(),
+ dns.size(), dnsUsed.size(),
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
+ throw new SCMException(e,
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return dns;
+ }
+
+ @Override
+ public Pipeline create(ReplicationFactor factor) throws IOException {
+ List dns;
+
+ switch(factor) {
+ case ONE:
+ dns = pickNodesNeverUsed(ReplicationFactor.ONE);
+ break;
+ case THREE:
+ dns = placementPolicy.chooseDatanodes(null,
+ null, factor.getNumber(), 0);
+ break;
+ default:
+ throw new IllegalStateException("Unknown factor: " + factor.name());
}
Pipeline pipeline = Pipeline.newBuilder()
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 777a0b05aabd..21c4fbf4d398 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -64,8 +64,8 @@ static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
try {
destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
} catch (IOException e) {
- LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
- pipeline.getId(), dn);
+ LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}",
+ pipeline.getId(), dn, e.getMessage());
}
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d4db29..80c934f8f6ea 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -54,10 +55,6 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static org.apache.hadoop.hdds.scm
- .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm
- .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
/**
@@ -84,6 +81,8 @@ public class SCMPipelineManager implements PipelineManager {
// Pipeline Manager MXBean
private ObjectName pmInfoBean;
private GrpcTlsConfig grpcTlsConfig;
+ private int pipelineNumberLimit;
+ private int heavyNodeCriteria;
public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
@@ -97,8 +96,8 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
new BackgroundPipelineCreator(this, scheduler, conf);
- int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
- OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+ int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB,
+ ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
final File metaDir = ServerUtils.getScmDbDir(conf);
final File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
this.pipelineStore =
@@ -115,6 +114,12 @@ public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
"SCMPipelineManagerInfo", this);
initializePipelineState();
this.grpcTlsConfig = grpcTlsConfig;
+ this.pipelineNumberLimit = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT);
+ this.heavyNodeCriteria = conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
}
public PipelineStateManager getStateManager() {
@@ -147,10 +152,33 @@ private void initializePipelineState() throws IOException {
}
}
+ private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
+ if (heavyNodeCriteria > 0 && factor == ReplicationFactor.THREE) {
+ return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() -
+ stateManager.getPipelines(ReplicationType.RATIS, factor,
+ Pipeline.PipelineState.CLOSED).size()) >= heavyNodeCriteria *
+ nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY);
+ }
+
+ if (pipelineNumberLimit > 0) {
+ return (stateManager.getPipelines(ReplicationType.RATIS).size() -
+ stateManager.getPipelines(ReplicationType.RATIS,
+ Pipeline.PipelineState.CLOSED).size()) >= pipelineNumberLimit;
+ }
+
+ return false;
+ }
+
@Override
public synchronized Pipeline createPipeline(
ReplicationType type, ReplicationFactor factor) throws IOException {
lock.writeLock().lock();
+ if (type == ReplicationType.RATIS && exceedPipelineNumberLimit(factor)) {
+ lock.writeLock().unlock();
+ throw new SCMException("Pipeline number meets the limit: " +
+ pipelineNumberLimit,
+ SCMException.ResultCodes.FAILED_TO_FIND_HEALTHY_NODES);
+ }
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
@@ -160,10 +188,9 @@ public synchronized Pipeline createPipeline(
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
return pipeline;
- } catch (InsufficientDatanodesException idEx) {
- throw idEx;
} catch (IOException ex) {
metrics.incNumPipelineCreationFailed();
+ LOG.error("Pipeline creation failed.", ex);
throw ex;
} finally {
lock.writeLock().unlock();
@@ -172,7 +199,7 @@ public synchronized Pipeline createPipeline(
@Override
public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
- List nodes) {
+ List nodes) {
// This will mostly be used to create dummy pipeline for SimplePipelines.
// We don't update the metrics for SimplePipelines.
lock.writeLock().lock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index d0f7f6ef3be1..1b23036946eb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -123,6 +123,14 @@ void incNumPipelineCreated() {
numPipelineCreated.incr();
}
+ /**
+ * Get the number of pipeline created.
+ * @return number of pipeline
+ */
+ long getNumPipelineCreated() {
+ return numPipelineCreated.value();
+ }
+
/**
* Increments number of failed pipeline creation count.
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index ab98dfa3ed7b..54e2141e5aa9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -48,7 +48,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
String e = String
.format("Cannot create pipeline of factor %d using %d nodes.",
factor.getNumber(), dns.size());
- throw new IOException(e);
+ throw new InsufficientDatanodesException(e);
}
Collections.shuffle(dns);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 7a00d760fa4d..b3aac5ec804d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -20,7 +20,6 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
@@ -56,7 +55,7 @@ public class HealthyPipelineSafeModeRule
private final PipelineManager pipelineManager;
private final int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
- private final Set processedDatanodeDetails =
+ private final Set processedPipelineIDs =
new HashSet<>();
HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
@@ -116,46 +115,46 @@ protected void process(PipelineReportFromDatanode
// processed report event, we should not consider this pipeline report
// from datanode again during threshold calculation.
Preconditions.checkNotNull(pipelineReportFromDatanode);
- DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
- if (!processedDatanodeDetails.contains(
- pipelineReportFromDatanode.getDatanodeDetails())) {
-
- Pipeline pipeline;
- PipelineReportsProto pipelineReport =
- pipelineReportFromDatanode.getReport();
-
- for (PipelineReport report : pipelineReport.getPipelineReportList()) {
- PipelineID pipelineID = PipelineID
- .getFromProtobuf(report.getPipelineID());
- try {
- pipeline = pipelineManager.getPipeline(pipelineID);
- } catch (PipelineNotFoundException e) {
- continue;
- }
-
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
- pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
- // If the pipeline is open state mean, all 3 datanodes are reported
- // for this pipeline.
- currentHealthyPipelineCount++;
- getSafeModeMetrics().incCurrentHealthyPipelinesCount();
- }
+
+ Pipeline pipeline;
+ PipelineReportsProto pipelineReport =
+ pipelineReportFromDatanode.getReport();
+
+ for (PipelineReport report : pipelineReport.getPipelineReportList()) {
+ PipelineID pipelineID = PipelineID
+ .getFromProtobuf(report.getPipelineID());
+ if (processedPipelineIDs.contains(pipelineID)) {
+ continue;
+ }
+
+ try {
+ pipeline = pipelineManager.getPipeline(pipelineID);
+ } catch (PipelineNotFoundException e) {
+ continue;
}
- if (scmInSafeMode()) {
- SCMSafeModeManager.getLogger().info(
- "SCM in safe mode. Healthy pipelines reported count is {}, " +
- "required healthy pipeline reported count is {}",
- currentHealthyPipelineCount, healthyPipelineThresholdCount);
+
+ if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+ pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
+ // If the pipeline is open state mean, all 3 datanodes are reported
+ // for this pipeline.
+ currentHealthyPipelineCount++;
+ getSafeModeMetrics().incCurrentHealthyPipelinesCount();
}
- processedDatanodeDetails.add(dnDetails);
+ processedPipelineIDs.add(pipelineID);
}
+ if (scmInSafeMode()) {
+ SCMSafeModeManager.getLogger().info(
+ "SCM in safe mode. Healthy pipelines reported count is {}, " +
+ "required healthy pipeline reported count is {}",
+ currentHealthyPipelineCount, healthyPipelineThresholdCount);
+ }
}
@Override
protected void cleanup() {
- processedDatanodeDetails.clear();
+ processedPipelineIDs.clear();
}
@VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 7d9cb3e24646..7708bede9fac 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -390,10 +390,10 @@ public void notifyObjectStageChange(StorageContainerLocationProtocolProtos
public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
throws IOException {
- // TODO: will be addressed in future patch.
- // This is needed only for debugging purposes to make sure cluster is
- // working correctly.
- return null;
+ Pipeline result = scm.getPipelineManager().createPipeline(type, factor);
+ AUDIT.logWriteSuccess(
+ buildAuditMessageForSuccess(SCMAction.CREATE_PIPELINE, null));
+ return result;
}
@Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 4ecab375c1bb..c25b3a0d9505 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -58,7 +58,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
@@ -394,7 +394,7 @@ private void initializeSystemManagers(OzoneConfiguration conf,
SCMContainerPlacementMetrics placementMetrics =
SCMContainerPlacementMetrics.create();
- ContainerPlacementPolicy containerPlacementPolicy =
+ PlacementPolicy containerPlacementPolicy =
ContainerPlacementPolicyFactory.getPolicy(conf, scmNodeManager,
clusterMap, true, placementMetrics);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index b7a9813483d1..29da9daf893c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -16,11 +16,13 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -85,7 +87,7 @@ public class MockNodeManager implements NodeManager {
private final SCMNodeStat aggregateStat;
private boolean safemode;
private final Map> commandMap;
- private final Node2PipelineMap node2PipelineMap;
+ private Node2PipelineMap node2PipelineMap;
private final Node2ContainerMap node2ContainerMap;
private NetworkTopology clusterMap;
private ConcurrentHashMap dnsToUuidMap;
@@ -99,6 +101,7 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
this.node2ContainerMap = new Node2ContainerMap();
this.dnsToUuidMap = new ConcurrentHashMap();
aggregateStat = new SCMNodeStat();
+ clusterMap = new NetworkTopologyImpl(new Configuration());
if (initializeFakeNodes) {
for (int x = 0; x < nodeCount; x++) {
DatanodeDetails dd = TestUtils.randomDatanodeDetails();
@@ -249,6 +252,16 @@ public Set getPipelines(DatanodeDetails dnId) {
return node2PipelineMap.getPipelines(dnId.getUuid());
}
+ /**
+ * Get the count of pipelines a datanodes is associated with.
+ * @param datanodeDetails DatanodeDetails
+ * @return The number of pipelines
+ */
+ @Override
+ public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+ return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid());
+ }
+
/**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
@@ -258,6 +271,22 @@ public void addPipeline(Pipeline pipeline) {
node2PipelineMap.addPipeline(pipeline);
}
+ /**
+ * Get the entire Node2PipelineMap.
+ * @return Node2PipelineMap
+ */
+ public Node2PipelineMap getNode2PipelineMap() {
+ return node2PipelineMap;
+ }
+
+ /**
+ * Set the Node2PipelineMap.
+ * @param node2PipelineMap Node2PipelineMap
+ */
+ public void setNode2PipelineMap(Node2PipelineMap node2PipelineMap) {
+ this.node2PipelineMap = node2PipelineMap;
+ }
+
/**
* Remove a pipeline information from the NodeManager.
* @param pipeline - Pipeline to be removed
@@ -488,6 +517,11 @@ public DatanodeDetails getNodeByAddress(String address) {
return getNodeByUuid(dnsToUuidMap.get(address));
}
+ @Override
+ public NetworkTopology getClusterNetworkTopologyMap() {
+ return clusterMap;
+ }
+
public void setNetworkTopology(NetworkTopology topology) {
this.clusterMap = topology;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 1631447af1f5..63735f77b7ab 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -27,8 +27,7 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
- .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -66,7 +65,7 @@ public class TestReplicationManager {
private ReplicationManager replicationManager;
private ContainerStateManager containerStateManager;
- private ContainerPlacementPolicy containerPlacementPolicy;
+ private PlacementPolicy containerPlacementPolicy;
private EventQueue eventQueue;
private DatanodeCommandHandler datanodeCommandHandler;
@@ -93,7 +92,7 @@ public void setup() throws IOException, InterruptedException {
.thenAnswer(invocation -> containerStateManager
.getContainerReplicas((ContainerID)invocation.getArguments()[0]));
- containerPlacementPolicy = Mockito.mock(ContainerPlacementPolicy.class);
+ containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
Mockito.when(containerPlacementPolicy.chooseDatanodes(
Mockito.anyListOf(DatanodeDetails.class),
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
index 18c4a64a0404..81f8c10fea63 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -99,7 +100,7 @@ public void testRackAwarePolicy() throws IOException {
when(nodeManager.getNodeStat(datanodes.get(4)))
.thenReturn(new SCMNodeMetric(storageCapacity, 70L, 30L));
- ContainerPlacementPolicy policy = ContainerPlacementPolicyFactory
+ PlacementPolicy policy = ContainerPlacementPolicyFactory
.getPolicy(conf, nodeManager, cluster, true,
SCMContainerPlacementMetrics.create());
@@ -117,7 +118,7 @@ public void testRackAwarePolicy() throws IOException {
@Test
public void testDefaultPolicy() throws IOException {
- ContainerPlacementPolicy policy = ContainerPlacementPolicyFactory
+ PlacementPolicy policy = ContainerPlacementPolicyFactory
.getPolicy(conf, null, null, true, null);
Assert.assertSame(SCMContainerPlacementRandom.class, policy.getClass());
}
@@ -125,7 +126,7 @@ public void testDefaultPolicy() throws IOException {
/**
* A dummy container placement implementation for test.
*/
- public static class DummyImpl implements ContainerPlacementPolicy {
+ public static class DummyImpl implements PlacementPolicy {
@Override
public List chooseDatanodes(
List excludedNodes, List favoredNodes,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 26ffd8d1d34d..2206e4dfe76b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -27,8 +27,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
- .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -135,7 +134,7 @@ public void testContainerPlacementCapacity() throws IOException,
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
testDir.getAbsolutePath());
conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ SCMContainerPlacementCapacity.class, PlacementPolicy.class);
SCMNodeManager nodeManager = createNodeManager(conf);
SCMContainerManager containerManager =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 7657b54373f3..20cc3cf35830 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -64,6 +64,8 @@
import org.junit.Test;
import org.mockito.Mockito;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test DeadNodeHandler.
*/
@@ -84,6 +86,7 @@ public void setup() throws IOException, AuthenticationException {
storageDir = GenericTestUtils.getTempPath(
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
eventQueue = new EventQueue();
scm = HddsTestUtils.getScm(conf);
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
new file mode 100644
index 000000000000..1e340393c476
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.*;
+import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
+/**
+ * Test for PipelinePlacementPolicy.
+ */
+public class TestPipelinePlacementPolicy {
+ private MockNodeManager nodeManager;
+ private OzoneConfiguration conf;
+ private PipelinePlacementPolicy placementPolicy;
+ private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
+
+ @Before
+ public void init() throws Exception {
+ nodeManager = new MockNodeManager(true,
+ PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+ conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
+ placementPolicy = new PipelinePlacementPolicy(
+ nodeManager, new PipelineStateManager(conf), conf);
+ }
+
+ @Test
+ public void testChooseNodeBasedOnNetworkTopology() {
+ List healthyNodes =
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+ // anchor should be removed from healthyNodes after being chosen.
+ Assert.assertFalse(healthyNodes.contains(anchor));
+
+ List excludedNodes =
+ new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+ DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
+ nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
+ // excludedNodes should contain nextNode after being chosen.
+ Assert.assertTrue(excludedNodes.contains(nextNode));
+ // nextNode should not be the same as anchor.
+ Assert.assertTrue(anchor.getUuid() != nextNode.getUuid());
+ }
+
+ @Test
+ public void testChooseNodeBasedOnRackAwareness() {
+ List healthyNodes = overWriteLocationInNodes(
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));
+ DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+ NetworkTopology topologyWithDifRacks =
+ createNetworkTopologyOnDifRacks();
+ DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
+ healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ topologyWithDifRacks, anchor);
+ Assert.assertFalse(topologyWithDifRacks.isSameParent(anchor, nextNode));
+ }
+
+ private final static Node[] NODES = new NodeImpl[] {
+ new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h3", "/r1", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h4", "/r1", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h5", "/r2", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h6", "/r2", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h7", "/r2", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h8", "/r2", NetConstants.NODE_COST_DEFAULT),
+ };
+
+
+ private NetworkTopology createNetworkTopologyOnDifRacks() {
+ NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
+ for (Node n : NODES) {
+ topology.add(n);
+ }
+ return topology;
+ }
+
+ private List overWriteLocationInNodes(
+ List datanodes) {
+ List results = new ArrayList<>(datanodes.size());
+ for (int i = 0; i < datanodes.size(); i++) {
+ DatanodeDetails datanode = datanodes.get(i);
+ DatanodeDetails result = DatanodeDetails.newBuilder()
+ .setUuid(datanode.getUuidString())
+ .setHostName(datanode.getHostName())
+ .setIpAddress(datanode.getIpAddress())
+ .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE))
+ .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS))
+ .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST))
+ .setNetworkLocation(NODES[i].getNetworkLocation()).build();
+ results.add(result);
+ }
+ return results;
+ }
+
+ @Test
+ public void testHeavyNodeShouldBeExcluded() throws SCMException{
+ List healthyNodes =
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
+ // only minority of healthy NODES are heavily engaged in pipelines.
+ int minorityHeavy = healthyNodes.size()/2 - 1;
+ List pickedNodes1 = placementPolicy.chooseDatanodes(
+ new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ nodesRequired, 0);
+ // modify node to pipeline mapping.
+ insertHeavyNodesIntoNodeManager(healthyNodes, minorityHeavy);
+ // NODES should be sufficient.
+ Assert.assertEquals(nodesRequired, pickedNodes1.size());
+ // make sure pipeline placement policy won't select duplicated NODES.
+ Assert.assertTrue(checkDuplicateNodesUUID(pickedNodes1));
+
+ // majority of healthy NODES are heavily engaged in pipelines.
+ int majorityHeavy = healthyNodes.size()/2 + 2;
+ insertHeavyNodesIntoNodeManager(healthyNodes, majorityHeavy);
+ boolean thrown = false;
+ List pickedNodes2 = null;
+ try {
+ pickedNodes2 = placementPolicy.chooseDatanodes(
+ new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ nodesRequired, 0);
+ } catch (SCMException e) {
+ Assert.assertFalse(thrown);
+ thrown = true;
+ }
+ // NODES should NOT be sufficient and exception should be thrown.
+ Assert.assertNull(pickedNodes2);
+ Assert.assertTrue(thrown);
+ }
+
+ private boolean checkDuplicateNodesUUID(List nodes) {
+ HashSet uuids = nodes.stream().
+ map(DatanodeDetails::getUuid).
+ collect(Collectors.toCollection(HashSet::new));
+ return uuids.size() == nodes.size();
+ }
+
+ private Set mockPipelineIDs(int count) {
+ Set pipelineIDs = new HashSet<>(count);
+ for (int i = 0; i < count; i++) {
+ pipelineIDs.add(PipelineID.randomId());
+ }
+ return pipelineIDs;
+ }
+
+ private void insertHeavyNodesIntoNodeManager(
+ List nodes, int heavyNodeCount) throws SCMException{
+ if (nodes == null) {
+ throw new SCMException("",
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+
+ int considerHeavyCount =
+ conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT) + 1;
+
+ Node2PipelineMap mockMap = new Node2PipelineMap();
+ for (DatanodeDetails node : nodes) {
+ // mock heavy node
+ if (heavyNodeCount > 0) {
+ mockMap.insertNewDatanode(
+ node.getUuid(), mockPipelineIDs(considerHeavyCount));
+ heavyNodeCount--;
+ } else {
+ mockMap.insertNewDatanode(node.getUuid(), mockPipelineIDs(1));
+ }
+ }
+ nodeManager.setNode2PipelineMap(mockMap);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
index 5572e9aa1ef4..4ad3456e7ba8 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
@@ -25,8 +25,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
- .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
@@ -70,7 +69,7 @@ public void setup(boolean enabled) {
.thenReturn(new HashSet<>());
replicationManager = new ReplicationManager(
new ReplicationManagerConfiguration(),
- containerManager, Mockito.mock(ContainerPlacementPolicy.class),
+ containerManager, Mockito.mock(PlacementPolicy.class),
eventQueue, new LockManager(configuration));
scmPipelineManager = Mockito.mock(SCMPipelineManager.class);
blockManager = Mockito.mock(BlockManagerImpl.class);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 30a75efb1942..f36d97f96708 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -164,6 +165,16 @@ public Set getPipelines(DatanodeDetails dnId) {
throw new UnsupportedOperationException("Not yet implemented");
}
+ /**
+ * Get the count of pipelines a datanodes is associated with.
+ * @param dnId DatanodeDetails
+ * @return The number of pipelines
+ */
+ @Override
+ public int getPipelinesCount(DatanodeDetails dnId) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
/**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
@@ -326,4 +337,9 @@ public DatanodeDetails getNodeByUuid(String address) {
public DatanodeDetails getNodeByAddress(String address) {
return null;
}
+
+ @Override
+ public NetworkTopology getClusterNetworkTopologyMap() {
+ return null;
+ }
}
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
index 1b95418338c5..1246faef9bce 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.hdds.scm.cli.pipeline.ActivatePipelineSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.ClosePipelineSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.DeactivatePipelineSubcommand;
+import org.apache.hadoop.hdds.scm.cli.pipeline.CreatePipelineSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.ListPipelinesSubcommand;
import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.client.ScmClient;
@@ -85,6 +86,7 @@
DeleteSubcommand.class,
CreateSubcommand.class,
CloseSubcommand.class,
+ CreatePipelineSubcommand.class,
ListPipelinesSubcommand.class,
ActivatePipelineSubcommand.class,
DeactivatePipelineSubcommand.class,
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/CreatePipelineSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/CreatePipelineSubcommand.java
new file mode 100644
index 000000000000..edeb786726a9
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/CreatePipelineSubcommand.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.cli.pipeline;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.SCMCLI;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Handler of createPipeline command.
+ */
+@CommandLine.Command(
+ name = "createPipeline",
+ description = "create pipeline",
+ mixinStandardHelpOptions = true,
+ versionProvider = HddsVersionProvider.class)
+public class CreatePipelineSubcommand implements Callable {
+ @CommandLine.ParentCommand
+ private SCMCLI parent;
+
+ @CommandLine.Option(
+ names = {"-t", "--replicationType"},
+ description = "Replication type (STAND_ALONE, RATIS)",
+ defaultValue = "STAND_ALONE"
+ )
+ private HddsProtos.ReplicationType type
+ = HddsProtos.ReplicationType.STAND_ALONE;
+
+ @CommandLine.Option(
+ names = {"-f", "--replicationFactor"},
+ description = "Replication factor (ONE, THREE)",
+ defaultValue = "ONE"
+ )
+ private HddsProtos.ReplicationFactor factor
+ = HddsProtos.ReplicationFactor.ONE;
+
+ @Override
+ public Void call() throws Exception {
+ if (type == HddsProtos.ReplicationType.CHAINED) {
+ throw new IllegalArgumentException(type.name()
+ + " is not supported yet.");
+ }
+ try (ScmClient scmClient = parent.createScmClient()) {
+ scmClient.createReplicationPipeline(
+ type,
+ factor,
+ HddsProtos.NodePool.getDefaultInstance());
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559fd3a5..9bccb1acebbf 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -169,7 +169,7 @@ public void testPipelineCloseWithPipelineAction() throws Exception {
new PipelineActionHandler(pipelineManager, conf);
pipelineActionHandler
.onMessage(pipelineActionsFromDatanode, new EventQueue());
- Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+ Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 10));
OzoneContainer ozoneContainer =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index 6ace90cb248e..d0afbbebedc2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -34,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
@@ -48,9 +50,12 @@ public class TestRatisPipelineCreateAndDestroy {
public void init(int numDatanodes) throws Exception {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
GenericTestUtils.getRandomizedTempPath());
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
- .setHbInterval(1000)
+ .setPipelineNumber(numDatanodes + numDatanodes/3)
+ .setHbInterval(2000)
.setHbProcessorInterval(1000)
.build();
cluster.waitForClusterToBeReady();
@@ -103,7 +108,7 @@ public void testPipelineCreationOnNodeRestart() throws Exception {
} catch (IOException ioe) {
// As now all datanodes are shutdown, they move to stale state, there
// will be no sufficient datanodes to create the pipeline.
- Assert.assertTrue(ioe instanceof InsufficientDatanodesException);
+ Assert.assertTrue(ioe instanceof SCMException);
}
// make sure pipelines is destroyed
@@ -116,9 +121,13 @@ public void testPipelineCreationOnNodeRestart() throws Exception {
for (Pipeline pipeline : pipelines) {
pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
}
- // make sure pipelines is created after node start
- pipelineManager.triggerPipelineCreation();
- waitForPipelines(1);
+
+ if (cluster.getStorageContainerManager()
+ .getScmNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) > 0) {
+ // make sure pipelines is created after node start
+ pipelineManager.triggerPipelineCreation();
+ waitForPipelines(1);
+ }
}
private void waitForPipelines(int numPipelines)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 00144e4e654c..7526575820ea 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdds.scm.pipeline;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -33,6 +32,8 @@
import java.util.ArrayList;
import java.util.List;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test for RatisPipelineProvider.
*/
@@ -46,14 +47,17 @@ public class TestRatisPipelineProvider {
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
stateManager = new PipelineStateManager(new OzoneConfiguration());
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
provider = new MockRatisPipelineProvider(nodeManager,
- stateManager, new OzoneConfiguration());
+ stateManager, conf);
}
private void createPipelineAndAssertions(
HddsProtos.ReplicationFactor factor) throws IOException {
Pipeline pipeline = provider.create(factor);
stateManager.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getPipelineState(),
@@ -61,10 +65,7 @@ private void createPipelineAndAssertions(
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
Pipeline pipeline1 = provider.create(factor);
stateManager.addPipeline(pipeline1);
- // New pipeline should not overlap with the previous created pipeline
- Assert.assertTrue(
- CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes())
- .isEmpty());
+ nodeManager.addPipeline(pipeline1);
Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getPipelineState(),
@@ -77,6 +78,7 @@ public void testCreatePipelineWithFactor() throws IOException {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline = provider.create(factor);
stateManager.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getPipelineState(),
@@ -86,11 +88,7 @@ public void testCreatePipelineWithFactor() throws IOException {
factor = HddsProtos.ReplicationFactor.ONE;
Pipeline pipeline1 = provider.create(factor);
stateManager.addPipeline(pipeline1);
- // New pipeline should overlap with the previous created pipeline,
- // and one datanode should overlap between the two types.
- Assert.assertEquals(
- CollectionUtils.intersection(pipeline.getNodes(),
- pipeline1.getNodes()).size(), 1);
+ nodeManager.addPipeline(pipeline1);
Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getPipelineState(),
@@ -139,45 +137,37 @@ public void testCreatePipelineWithNodes() {
@Test
public void testCreatePipelinesDnExclude() throws IOException {
- // We have 10 DNs in MockNodeManager.
- // Use up first 3 DNs for an open pipeline.
- List openPiplineDns = nodeManager.getAllNodes()
- .subList(0, 3);
+ List allHealthyNodes =
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ int totalHealthyNodesCount = allHealthyNodes.size();
+
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
- Pipeline openPipeline = Pipeline.newBuilder()
- .setType(HddsProtos.ReplicationType.RATIS)
- .setFactor(factor)
- .setNodes(openPiplineDns)
- .setState(Pipeline.PipelineState.OPEN)
- .setId(PipelineID.randomId())
- .build();
-
- stateManager.addPipeline(openPipeline);
-
- // Use up next 3 DNs also for an open pipeline.
- List moreOpenPiplineDns = nodeManager.getAllNodes()
- .subList(3, 6);
- Pipeline anotherOpenPipeline = Pipeline.newBuilder()
- .setType(HddsProtos.ReplicationType.RATIS)
- .setFactor(factor)
- .setNodes(moreOpenPiplineDns)
- .setState(Pipeline.PipelineState.OPEN)
- .setId(PipelineID.randomId())
- .build();
- stateManager.addPipeline(anotherOpenPipeline);
-
- // Use up next 3 DNs also for a closed pipeline.
- List closedPiplineDns = nodeManager.getAllNodes()
- .subList(6, 9);
- Pipeline anotherClosedPipeline = Pipeline.newBuilder()
- .setType(HddsProtos.ReplicationType.RATIS)
- .setFactor(factor)
- .setNodes(closedPiplineDns)
- .setState(Pipeline.PipelineState.CLOSED)
- .setId(PipelineID.randomId())
- .build();
- stateManager.addPipeline(anotherClosedPipeline);
+ List closePipelineDns = new ArrayList<>();
+ for (int i = 0; i < totalHealthyNodesCount/3; i++) {
+ List pipelineDns = allHealthyNodes
+ .subList(3 * i, 3 * (i + 1));
+
+ Pipeline.PipelineState state;
+ if (i % 2 == 0) {
+ state = Pipeline.PipelineState.OPEN;
+ } else {
+ state = Pipeline.PipelineState.CLOSED;
+ closePipelineDns.addAll(pipelineDns);
+ }
+
+ Pipeline openPipeline = Pipeline.newBuilder()
+ .setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(factor)
+ .setNodes(pipelineDns)
+ .setState(state)
+ .setId(PipelineID.randomId())
+ .build();
+
+
+ stateManager.addPipeline(openPipeline);
+ nodeManager.addPipeline(openPipeline);
+ }
Pipeline pipeline = provider.create(factor);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
@@ -187,15 +177,9 @@ public void testCreatePipelinesDnExclude() throws IOException {
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
List pipelineNodes = pipeline.getNodes();
- // Pipline nodes cannot be from open pipelines.
- Assert.assertTrue(
- pipelineNodes.parallelStream().filter(dn ->
- (openPiplineDns.contains(dn) || moreOpenPiplineDns.contains(dn)))
- .count() == 0);
-
// Since we have only 10 DNs, at least 1 pipeline node should have been
// from the closed pipeline DN list.
Assert.assertTrue(pipelineNodes.parallelStream().filter(
- closedPiplineDns::contains).count() > 0);
+ closePipelineDns::contains).count() > 0);
}
}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2a486b1224ed..9d5996011beb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -28,6 +29,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -58,6 +60,7 @@ public class TestSCMPipelineManager {
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
testDir = GenericTestUtils
.getTestDir(TestSCMPipelineManager.class.getSimpleName());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
@@ -253,10 +256,8 @@ public void testPipelineCreationFailedMetric() throws Exception {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert.fail();
- } catch (InsufficientDatanodesException idEx) {
- Assert.assertEquals(
- "Cannot create pipeline of factor 3 using 1 nodes.",
- idEx.getMessage());
+ } catch (SCMException idEx) {
+ // pipeline creation failed this time.
}
metrics = getMetrics(
@@ -266,7 +267,7 @@ public void testPipelineCreationFailedMetric() throws Exception {
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
- Assert.assertTrue(numPipelineCreateFailed == 0);
+ Assert.assertTrue(numPipelineCreateFailed == 1);
}
@Test
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index 459a67ae882a..1af2f74ee453 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -57,8 +57,11 @@ public class TestSCMRestart {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
+ int numOfNodes = 4;
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(4)
+ .setNumDatanodes(numOfNodes)
+ // allow only one FACTOR THREE pipeline.
+ .setPipelineNumber(numOfNodes + 1)
.setHbInterval(1000)
.setHbProcessorInterval(1000)
.build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
index 7cfd555a509e..1caa302c1540 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
@@ -38,6 +38,7 @@
import java.util.List;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.junit.Assert.fail;
/**
@@ -62,8 +63,11 @@ public void setup(int numDatanodes) throws Exception {
true);
conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "10s");
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1000);
+
clusterBuilder = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
+ .setPipelineNumber(numDatanodes + numDatanodes/3)
.setHbInterval(1000)
.setHbProcessorInterval(1000);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 0aba9689ffbe..4fe1701f010a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -266,6 +266,7 @@ abstract class Builder {
protected int numOfDatanodes = 1;
protected boolean startDataNodes = true;
protected CertificateClient certClient;
+ protected int pipelineNumber = 3;
protected Builder(OzoneConfiguration conf) {
this.conf = conf;
@@ -352,6 +353,16 @@ public Builder setNumDatanodes(int val) {
return this;
}
+ /**
+ * Sets the total number of pipelines to create.
+ * @param val number of pipelines
+ * @return MiniOzoneCluster.Builder
+ */
+ public Builder setPipelineNumber(int val) {
+ pipelineNumber = val;
+ return this;
+ }
+
/**
* Sets the number of HeartBeat Interval of Datanodes, the value should be
* in MilliSeconds.
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ac76482bd908..39b2582bee9d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -494,6 +494,9 @@ void initializeConfiguration() throws IOException {
streamBufferMaxSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
streamBufferSizeUnit.get());
+ // MiniOzoneCluster should have global pipeline upper limit.
+ conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+ pipelineNumber == 3 ? 2 * numOfDatanodes : pipelineNumber);
configureTrace();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
index 129cf0488e37..6e307a036097 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
@@ -22,7 +22,7 @@
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
@@ -53,7 +53,7 @@ public static void setup() throws Exception {
containerSizeGB * OzoneConsts.GB);
ozoneConf = new OzoneConfiguration();
ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ SCMContainerPlacementCapacity.class, PlacementPolicy.class);
cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(1).build();
StorageContainerLocationProtocolClientSideTranslatorPB client =
cluster.getStorageContainerLocationClient();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
index 2d2d028884a3..7f2d62984609 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
@@ -29,8 +29,7 @@
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.
- ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.
SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.protocolPB.
@@ -60,7 +59,7 @@ public class TestContainerStateMachineIdempotency {
public static void init() throws Exception {
ozoneConfig = new OzoneConfiguration();
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ SCMContainerPlacementCapacity.class, PlacementPolicy.class);
cluster =
MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
index cf570d28f7c1..ea648c95d97e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
@@ -81,6 +81,7 @@ private void startCluster(OzoneConfiguration conf) throws Exception {
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setPipelineNumber(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 8649837a0cd0..908849749b33 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
@@ -89,8 +90,10 @@ public void init() throws Exception {
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
+
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
- .setBlockSize(blockSize).setChunkSize(chunkSize)
+ .setPipelineNumber(10).setBlockSize(blockSize).setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES).build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index ea5190097188..ff9fad492c1a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -96,6 +96,7 @@ public static void init() throws Exception {
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setPipelineNumber(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index 0886d26fe64b..865e0b52a7a0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -54,8 +54,7 @@
import java.util.function.Predicate;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* Tests delete key operation with a slow follower in the datanode
@@ -99,10 +98,12 @@ public static void init() throws Exception {
1000, TimeUnit.SECONDS);
conf.setLong("hdds.scm.replication.thread.interval",
containerReportInterval);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200)
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4)
+ .setPipelineNumber(6).setHbInterval(200)
.build();
cluster.waitForClusterToBeReady();
cluster.getStorageContainerManager().getReplicationManager().start();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 19a170797315..37b8a5ff40b5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -52,8 +52,7 @@
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* Tests the containerStateMachine failure handling.
@@ -82,7 +81,7 @@ public static void init() throws Exception {
baseDir.mkdirs();
conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
- // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+ // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
index 30c2624fbf55..00556a8a0063 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
@@ -80,6 +80,7 @@ public class TestDeleteWithSlowFollower {
private static String bucketName;
private static String path;
private static XceiverClientManager xceiverClientManager;
+ private static final int FACTOR_THREE_PIPELINE_COUNT = 1;
/**
* Create a MiniDFSCluster for testing.
@@ -111,10 +112,12 @@ public static void init() throws Exception {
1000, TimeUnit.SECONDS);
conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
1, TimeUnit.SECONDS);
-
conf.setQuietMode(false);
- cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100)
+ int numOfDatanodes = 3;
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(numOfDatanodes)
+ .setPipelineNumber(numOfDatanodes + FACTOR_THREE_PIPELINE_COUNT)
+ .setHbInterval(100)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
@@ -176,7 +179,7 @@ public void testDeleteKeyWithSlowFollower() throws Exception {
cluster.getStorageContainerManager().getPipelineManager()
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
- Assert.assertTrue(pipelineList.size() == 1);
+ Assert.assertEquals(FACTOR_THREE_PIPELINE_COUNT, pipelineList.size());
Pipeline pipeline = pipelineList.get(0);
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index edb796b8799d..03683237e691 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -97,6 +98,7 @@ private void init() throws Exception {
1, TimeUnit.SECONDS);
conf.setBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@@ -105,7 +107,7 @@ private void init() throws Exception {
Collections.singleton(HddsUtils.getHostName(conf))).get(0),
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(10).build();
+ .setNumDatanodes(10).setPipelineNumber(15).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
index 47a716e85ca2..84649e3458fe 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
@@ -67,7 +67,8 @@ public class TestHybridPipelineOnDatanode {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
+ .setPipelineNumber(5).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index fa8a289ea810..666264cb162d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -81,6 +81,7 @@ public static void init() throws Exception {
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
+ .setPipelineNumber(5)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
index 96662471a3ed..6dbae6ae9d9d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
@@ -47,8 +47,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* Tests MultiBlock Writes with Dn failures by Ozone Client.
@@ -87,10 +86,13 @@ private void startCluster(int datanodes) throws Exception {
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
1, TimeUnit.SECONDS);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(datanodes).build();
+ .setNumDatanodes(datanodes)
+ .setPipelineNumber(0)
+ .build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 5f6d494a24b1..9e7e3c07bb5b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -91,6 +91,7 @@ public void init() throws Exception {
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setPipelineNumber(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index d91f739332ad..4710adaca48b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -164,6 +164,7 @@ public abstract class TestOzoneRpcClientAbstract {
static void startCluster(OzoneConfiguration conf) throws Exception {
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
+ .setPipelineNumber(10)
.setScmId(scmId)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 9b5934911945..fa89f5b7f5b3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -53,8 +53,7 @@
import java.util.concurrent.TimeoutException;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* This class verifies the watchForCommit Handling by xceiverClient.
@@ -92,10 +91,12 @@ private void startCluster(OzoneConfiguration conf) throws Exception {
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
1, TimeUnit.SECONDS);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setPipelineNumber(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index b676e1c96766..763f6395718e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -52,6 +52,8 @@
import java.util.List;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test container closing.
*/
@@ -73,8 +75,11 @@ public class TestCloseContainerByPipeline {
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, "1");
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(10)
+ .setPipelineNumber(15)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
index 863a2b3359fb..3019222ca6db 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
@@ -30,7 +30,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.HddsDatanodeService;
@@ -83,7 +83,7 @@ public static void init() throws Exception {
ozoneConfig = new OzoneConfiguration();
ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s");
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ SCMContainerPlacementCapacity.class, PlacementPolicy.class);
cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
index 4c62c70db7f0..ca7a6157152e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
@@ -24,7 +24,7 @@
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.protocolPB
@@ -59,7 +59,7 @@ public class TestContainerSmallFile {
public static void init() throws Exception {
ozoneConfig = new OzoneConfiguration();
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ SCMContainerPlacementCapacity.class, PlacementPolicy.class);
cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
index 8e4645f01af8..feb74c230f20 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
@@ -31,8 +31,7 @@
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.
StorageContainerException;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.
- ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.
SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.protocolPB.
@@ -62,7 +61,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
public static void init() throws Exception {
ozoneConfig = new OzoneConfiguration();
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+ SCMContainerPlacementCapacity.class, PlacementPolicy.class);
cluster =
MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 536d807aedb8..191589a38cbb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -82,6 +82,7 @@ public void setup() throws Exception {
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(4)
+ .setPipelineNumber(10)
.build();
cluster.waitForClusterToBeReady();
metrics = getMetrics(SCMContainerPlacementMetrics.class.getSimpleName());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
index c9b8c89e04da..618212a98738 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.scm.node;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -77,9 +78,11 @@ public void setUp() throws Exception {
conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numOfDatanodes)
+ .setPipelineNumber(numOfDatanodes + numOfDatanodes/2)
.build();
cluster.waitForClusterToBeReady();
scmClient = new ContainerOperationClient(cluster
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
index ab351913d7a0..bd7173c4afab 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
@@ -99,6 +99,7 @@ public void init() throws Exception {
conf.setTimeDuration(
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
OMStorage omStore = new OMStorage(conf);
omStore.setClusterId(clusterId);
@@ -108,6 +109,8 @@ public void init() throws Exception {
// Start the cluster
cluster = MiniOzoneCluster.newHABuilder(conf)
+ .setNumDatanodes(7)
+ .setPipelineNumber(10)
.setClusterId(clusterId)
.setScmId(scmId)
.setOMServiceId(omServiceId)
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index fdcb822bf4d4..eb19fe77de08 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -42,7 +42,7 @@ public abstract class TestDataValidate {
static void startCluster(OzoneConfiguration conf) throws Exception {
conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(5).build();
+ .setNumDatanodes(5).setPipelineNumber(8).build();
cluster.waitForClusterToBeReady();
}
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
index 13ecab60226d..cc922f240e54 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
@@ -53,6 +53,7 @@ public static void init() throws Exception {
.setHbProcessorInterval(1000)
.setHbInterval(1000)
.setNumDatanodes(3)
+ .setPipelineNumber(8)
.build();
cluster.waitForClusterToBeReady();
}