Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
* the License.
*/

package org.apache.hadoop.hdds.scm.container.placement.algorithms;
package org.apache.hadoop.hdds.scm;

import org.apache.hadoop.hdds.protocol.DatanodeDetails;

import java.io.IOException;
import java.util.List;

/**
* A ContainerPlacementPolicy support choosing datanodes to build replication
* pipeline with specified constraints.
* A PlacementPolicy support choosing datanodes to build
* pipelines or containers with specified constraints.
*/
public interface ContainerPlacementPolicy {
public interface PlacementPolicy {

/**
* Given the replication factor and size required, return set of datanodes
* that satisfy the nodes and size requirement.
* Given an initial set of datanodes and the size required,
* return set of datanodes that satisfy the nodes and size requirement.
*
* @param excludedNodes - list of nodes to be excluded.
* @param favoredNodes - list of nodes preferred.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,19 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT =
"ozone.scm.pipeline.owner.container.count";
public static final int OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT = 3;
// Pipeline placement policy:
// the max number of pipelines can a single datanode be engaged in.
public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
"ozone.scm.datanode.max.pipeline.engagement";
// Setting to zero by default means this limit doesn't take effect.
public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0;

// Upper limit for how many pipelines can be created.
// Only for test purpose now.
public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT =
"ozone.scm.datanode.pipeline.number.limit";
// Setting to zero by default means this limit doesn't take effect.
public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0;

public static final String
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum SCMAction implements AuditAction {
GET_CONTAINER,
GET_CONTAINER_WITH_PIPELINE,
LIST_CONTAINER,
CREATE_PIPELINE,
LIST_PIPELINE,
CLOSE_PIPELINE,
ACTIVATE_PIPELINE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,14 @@ public ObjectStageChangeResponseProto notifyObjectStageChange(
public PipelineResponseProto allocatePipeline(
RpcController controller, PipelineRequestProto request)
throws ServiceException {
// TODO : Wiring this up requires one more patch.
return null;
try (Scope scope = TracingUtil
.importAndCreateScope("createPipeline", request.getTraceID())) {
impl.createReplicationPipeline(request.getReplicationType(),
request.getReplicationFactor(), request.getNodePool());
return PipelineResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}

@Override
Expand Down
20 changes: 18 additions & 2 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -823,9 +823,11 @@
</value>
<tag>OZONE, MANAGEMENT</tag>
<description>
The full name of class which implements org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy.
The full name of class which implements
org.apache.hadoop.hdds.scm.PlacementPolicy.
The class decides which datanode will be used to host the container replica. If not set,
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom will be used as default value.
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom will be used as default
value.
</description>
</property>
<property>
Expand All @@ -835,6 +837,20 @@
<description>Number of containers per owner in a pipeline.
</description>
</property>
<property>
<name>ozone.scm.datanode.max.pipeline.engagement</name>
<value>0</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>Max number of pipelines per datanode can be engaged in.
</description>
</property>
<property>
<name>ozone.scm.datanode.pipeline.number.limit</name>
<value>0</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>Upper limit for how many pipelines can be created in SCM.
</description>
</property>
<property>
<name>ozone.scm.container.size</name>
<value>5GB</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* the License.
*/

package org.apache.hadoop.hdds.scm.container.placement.algorithms;
package org.apache.hadoop.hdds.scm;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -33,25 +33,25 @@
import java.util.stream.Collectors;

/**
* SCM CommonPolicy implements a set of invariants which are common
* for all container placement policies, acts as the repository of helper
* This policy implements a set of invariants which are common
* for all basic placement policies, acts as the repository of helper
* functions which are common to placement policies.
*/
public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
public abstract class SCMCommonPlacementPolicy implements PlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMCommonPolicy.class);
LoggerFactory.getLogger(SCMCommonPlacementPolicy.class);
private final NodeManager nodeManager;
private final Random rand;
private final Configuration conf;

/**
* Constructs SCM Common Policy Class.
* Constructor.
*
* @param nodeManager NodeManager
* @param conf Configuration class.
*/
public SCMCommonPolicy(NodeManager nodeManager, Configuration conf) {
public SCMCommonPlacementPolicy(NodeManager nodeManager, Configuration conf) {
this.nodeManager = nodeManager;
this.rand = new Random();
this.conf = conf;
Expand Down Expand Up @@ -85,7 +85,7 @@ public Configuration getConf() {
}

/**
* Given the replication factor and size required, return set of datanodes
* Given size required, return set of datanodes
* that satisfy the nodes and size requirement.
* <p>
* Here are some invariants of container placement.
Expand Down Expand Up @@ -149,7 +149,7 @@ public List<DatanodeDetails> chooseDatanodes(
* @param datanodeDetails DatanodeDetails
* @return true if we have enough space.
*/
boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
public boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
long sizeRequired) {
SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails);
return (nodeMetric != null) && (nodeMetric.get() != null)
Expand All @@ -164,7 +164,7 @@ boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
* @param nodesRequired - Nodes Required
* @param healthyNodes - List of Nodes in the result set.
* @return List of Datanodes that can be used for placement.
* @throws SCMException
* @throws SCMException SCMException
*/
public List<DatanodeDetails> getResultSet(
int nodesRequired, List<DatanodeDetails> healthyNodes)
Expand All @@ -190,8 +190,7 @@ public List<DatanodeDetails> getResultSet(

/**
* Choose a datanode according to the policy, this function is implemented
* by the actual policy class. For example, PlacementCapacity or
* PlacementRandom.
* by the actual policy class.
*
* @param healthyNodes - Set of healthy nodes we can choose from.
* @return DatanodeDetails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public AllocatedBlock allocateBlock(final long size, ReplicationType type,
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);
} catch (SCMException se) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
"Datanodes may be used up.", type, factor, se);
break;
} catch (IOException e) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
"get pipelines call once.", type, factor, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.MetricsCollector;
Expand Down Expand Up @@ -85,7 +86,7 @@ public class ReplicationManager implements MetricsSource {
* PlacementPolicy which is used to identify where a container
* should be replicated.
*/
private final ContainerPlacementPolicy containerPlacement;
private final PlacementPolicy containerPlacement;

/**
* EventPublisher to fire Replicate and Delete container events.
Expand Down Expand Up @@ -131,12 +132,12 @@ public class ReplicationManager implements MetricsSource {
*
* @param conf OzoneConfiguration
* @param containerManager ContainerManager
* @param containerPlacement ContainerPlacementPolicy
* @param containerPlacement PlacementPolicy
* @param eventPublisher EventPublisher
*/
public ReplicationManager(final ReplicationManagerConfiguration conf,
final ContainerManager containerManager,
final ContainerPlacementPolicy containerPlacement,
final PlacementPolicy containerPlacement,
final EventPublisher eventPublisher,
final LockManager<ContainerID> lockManager) {
this.containerManager = containerManager;
Expand Down Expand Up @@ -474,7 +475,7 @@ private void forceCloseContainer(final ContainerInfo container,

/**
* If the given container is under replicated, identify a new set of
* datanode(s) to replicate the container using ContainerPlacementPolicy
* datanode(s) to replicate the container using PlacementPolicy
* and send replicate container command to the identified datanode(s).
*
* @param container ContainerInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
Expand All @@ -34,22 +35,23 @@ public final class ContainerPlacementPolicyFactory {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerPlacementPolicyFactory.class);

private static final Class<? extends ContainerPlacementPolicy>
private static final Class<? extends PlacementPolicy>
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT =
SCMContainerPlacementRandom.class;

private ContainerPlacementPolicyFactory() {
}

public static ContainerPlacementPolicy getPolicy(Configuration conf,
final NodeManager nodeManager, NetworkTopology clusterMap,
final boolean fallback, SCMContainerPlacementMetrics metrics)
throws SCMException{
final Class<? extends ContainerPlacementPolicy> placementClass = conf

public static PlacementPolicy getPolicy(
Configuration conf, final NodeManager nodeManager,
NetworkTopology clusterMap, final boolean fallback,
SCMContainerPlacementMetrics metrics) throws SCMException{
final Class<? extends PlacementPolicy> placementClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
ContainerPlacementPolicy.class);
Constructor<? extends ContainerPlacementPolicy> constructor;
PlacementPolicy.class);
Constructor<? extends PlacementPolicy> constructor;
try {
constructor = placementClass.getDeclaredConstructor(NodeManager.class,
Configuration.class, NetworkTopology.class, boolean.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
Expand Down Expand Up @@ -65,7 +66,8 @@
* little or no work and the cluster will achieve a balanced distribution
* over time.
*/
public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
public final class SCMContainerPlacementCapacity
extends SCMCommonPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementCapacity.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
Expand All @@ -45,7 +46,8 @@
* recommend to use this if the network topology has more layers.
* <p>
*/
public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
public final class SCMContainerPlacementRackAware
extends SCMCommonPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementRackAware.class);
Expand Down Expand Up @@ -271,8 +273,8 @@ private Node chooseNode(List<Node> excludedNodes, Node affinityNode,
throw new SCMException("No satisfied datanode to meet the" +
" excludedNodes and affinityNode constrains.", null);
}
if (hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
LOG.debug("Datanode {} is chosen for container. Required size is {}",
if (super.hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
LOG.debug("Datanode {} is chosen. Required size is {}",
node.toString(), sizeRequired);
metrics.incrDatanodeChooseSuccessCount();
if (isFallbacked) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.NodeManager;
Expand All @@ -37,8 +39,8 @@
* Balancer will need to support containers as a feature before this class
* can be practically used.
*/
public final class SCMContainerPlacementRandom extends SCMCommonPolicy
implements ContainerPlacementPolicy {
public final class SCMContainerPlacementRandom extends SCMCommonPlacementPolicy
implements PlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
Expand Down
Loading