diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index 8325f0963885..324774d7d77f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.security.cert.X509Certificate; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -106,6 +107,11 @@ public static RaftPeer toRaftPeer(DatanodeDetails id) { return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id)); } + public static RaftPeer toRaftPeer(DatanodeDetails id, int priority) { + return new RaftPeer( + toRaftPeerId(id), toRaftPeerAddressString(id), priority); + } + private static List toRaftPeers(Pipeline pipeline) { return toRaftPeers(pipeline.getNodes()); } @@ -125,6 +131,19 @@ private static RaftGroup newRaftGroup(Collection peers) { : RaftGroup.valueOf(DUMMY_GROUP_ID, peers); } + public static RaftGroup newRaftGroup(RaftGroupId groupId, + List peers, List priorityList) { + assert peers.size() == priorityList.size(); + + final List newPeers = new ArrayList<>(); + for (int i = 0; i < peers.size(); i++) { + RaftPeer peer = RatisHelper.toRaftPeer(peers.get(i), priorityList.get(i)); + newPeers.add(peer); + } + return peers.isEmpty() ? RaftGroup.valueOf(groupId, Collections.emptyList()) + : RaftGroup.valueOf(groupId, newPeers); + } + public static RaftGroup newRaftGroup(RaftGroupId groupId, Collection peers) { final List newPeers = peers.stream() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 540d2c0fa8f5..7b01e0797f3e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -321,6 +321,9 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT = "ozone.scm.pipeline.allocated.timeout"; + public static final String OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY = + "ozone.scm.pipeline.leader-choose.policy"; + public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT = "5m"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java index 0146eaed6e33..48a8e059d97b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java @@ -123,6 +123,7 @@ public enum ResultCodes { FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY, FAILED_TO_ALLOCATE_ENOUGH_BLOCKS, INTERNAL_ERROR, - FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY + FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY, + FAILED_TO_INIT_LEADER_CHOOSE_POLICY } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index d5c10242eb95..a4787efb80a9 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -61,6 +61,8 @@ public final class Pipeline { private UUID leaderId; // Timestamp for pipeline upon creation private Instant creationTimestamp; + // suggested leader id with high priority + private final UUID suggestedLeaderId; /** * The immutable properties of pipeline object is used in @@ -69,13 +71,14 @@ public final class Pipeline { */ private Pipeline(PipelineID id, ReplicationType type, ReplicationFactor factor, PipelineState state, - Map nodeStatus) { + Map nodeStatus, UUID suggestedLeaderId) { this.id = id; this.type = type; this.factor = factor; this.state = state; this.nodeStatus = nodeStatus; this.creationTimestamp = Instant.now(); + this.suggestedLeaderId = suggestedLeaderId; } /** @@ -123,6 +126,16 @@ public Instant getCreationTimestamp() { return creationTimestamp; } + /** + * Return the suggested leaderId which has a high priority among DNs of the + * pipeline. + * + * @return Suggested LeaderId + */ + public UUID getSuggestedLeaderId() { + return suggestedLeaderId; + } + /** * Set the creation timestamp. Only for protobuf now. * @@ -278,6 +291,14 @@ public HddsProtos.Pipeline getProtobufMessage() builder.setLeaderID128(uuid128); } + if (suggestedLeaderId != null) { + HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder() + .setMostSigBits(suggestedLeaderId.getMostSignificantBits()) + .setLeastSigBits(suggestedLeaderId.getLeastSignificantBits()) + .build(); + builder.setSuggestedLeaderID(uuid128); + } + // To save the message size on wire, only transfer the node order based on // network topology List nodes = nodesInOrder.get(); @@ -315,12 +336,20 @@ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) leaderId = UUID.fromString(pipeline.getLeaderID()); } + UUID suggestedLeaderId = null; + if (pipeline.hasSuggestedLeaderID()) { + HddsProtos.UUID uuid = pipeline.getSuggestedLeaderID(); + suggestedLeaderId = + new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()); + } + return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId())) .setFactor(pipeline.getFactor()) .setType(pipeline.getType()) .setState(PipelineState.fromProtobuf(pipeline.getState())) .setNodes(nodes) .setLeaderId(leaderId) + .setSuggestedLeaderId(suggestedLeaderId) .setNodesInOrder(pipeline.getMemberOrdersList()) .setCreateTimestamp(pipeline.getCreationTimeStamp()) .build(); @@ -392,6 +421,7 @@ public static class Builder { private List nodesInOrder = null; private UUID leaderId = null; private Instant creationTimestamp = null; + private UUID suggestedLeaderId = null; public Builder() {} @@ -404,6 +434,7 @@ public Builder(Pipeline pipeline) { this.nodesInOrder = pipeline.nodesInOrder.get(); this.leaderId = pipeline.getLeaderId(); this.creationTimestamp = pipeline.getCreationTimestamp(); + this.suggestedLeaderId = pipeline.getSuggestedLeaderId(); } public Builder setId(PipelineID id1) { @@ -447,13 +478,19 @@ public Builder setCreateTimestamp(long createTimestamp) { return this; } + public Builder setSuggestedLeaderId(UUID uuid) { + this.suggestedLeaderId = uuid; + return this; + } + public Pipeline build() { Preconditions.checkNotNull(id); Preconditions.checkNotNull(type); Preconditions.checkNotNull(factor); Preconditions.checkNotNull(state); Preconditions.checkNotNull(nodeStatus); - Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus); + Pipeline pipeline = + new Pipeline(id, type, factor, state, nodeStatus, suggestedLeaderId); pipeline.setLeaderId(leaderId); // overwrite with original creationTimestamp if (creationTimestamp != null) { @@ -484,6 +521,7 @@ public Pipeline build() { // This branch is for pipeline clone pipeline.setNodesInOrder(nodesInOrder); } + return pipeline; } } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 70d35a340168..b23eac61acb7 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -857,6 +857,24 @@ stage until it gets scrubbed. + + ozone.scm.pipeline.leader-choose.policy + + org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.DefaultLeaderChoosePolicy + + OZONE, SCM, PIPELINE + + The policy used for choosing desired leader for pipeline creation. + There are two policies supporting now: DefaultLeaderChoosePolicy, MinLeaderCountChoosePolicy. + org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.DefaultLeaderChoosePolicy + implements a policy that choose leader without depending on priority. + org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.MinLeaderCountChoosePolicy + implements a policy that choose leader which has the minimum exist leader count. + In the future, we need to add policies which consider: + 1. resource, the datanode with the most abundant cpu and memory can be made the leader + 2. topology, the datanode nearest to the client can be made the leader + + ozone.scm.container.size 5GB diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java index e435f7bb4bf5..8f41fe91868b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java @@ -17,8 +17,8 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import java.io.IOException; -import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; +import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.hdds.conf.ConfigurationSource; @@ -82,18 +82,20 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer, final CreatePipelineCommandProto createCommand = ((CreatePipelineCommand)command).getProto(); final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID(); - final Collection peers = + final List peers = createCommand.getDatanodeList().stream() .map(DatanodeDetails::getFromProtoBuf) .collect(Collectors.toList()); + final List priorityList = createCommand.getPriorityList(); try { XceiverServerSpi server = ozoneContainer.getWriteChannel(); if (!server.isExist(pipelineID)) { final RaftGroupId groupId = RaftGroupId.valueOf( PipelineID.getFromProtobuf(pipelineID).getId()); - final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers); - server.addGroup(pipelineID, peers); + final RaftGroup group = + RatisHelper.newRaftGroup(groupId, peers, priorityList); + server.addGroup(pipelineID, peers, priorityList); peers.stream().filter( d -> !d.getUuid().equals(dn.getUuid())) .forEach(d -> { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java index d8dfefdb5f64..480561270ec4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto; import java.io.IOException; -import java.util.Collection; import java.util.List; /** A server endpoint that acts as the communication layer for Ozone @@ -68,9 +67,16 @@ void submitRequest(ContainerCommandRequestProto request, * Join a new pipeline. */ default void addGroup(HddsProtos.PipelineID pipelineId, - Collection peers) throws IOException { + List peers) throws IOException { } + /** + * Join a new pipeline with priority. + */ + default void addGroup(HddsProtos.PipelineID pipelineId, + List peers, + List priorityList) throws IOException { + } /** * Exit a pipeline. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 573e681d61ce..0b0756c3a889 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -115,6 +116,9 @@ public final class XceiverServerRatis implements XceiverServerSpi { private static final Logger LOG = LoggerFactory .getLogger(XceiverServerRatis.class); private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); + private static final List DEFAULT_PRIORITY_LIST = + new ArrayList<>( + Collections.nCopies(HddsProtos.ReplicationFactor.THREE_VALUE, 0)); private static long nextCallId() { return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; @@ -711,10 +715,23 @@ public List getPipelineIds() { @Override public void addGroup(HddsProtos.PipelineID pipelineId, - Collection peers) throws IOException { + List peers) throws IOException { + if (peers.size() == getDefaultPriorityList().size()) { + addGroup(pipelineId, peers, getDefaultPriorityList()); + } else { + addGroup(pipelineId, peers, + new ArrayList<>(Collections.nCopies(peers.size(), 0))); + } + } + + @Override + public void addGroup(HddsProtos.PipelineID pipelineId, + List peers, + List priorityList) throws IOException { final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId); final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId()); - final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers); + final RaftGroup group = + RatisHelper.newRaftGroup(groupId, peers, priorityList); GroupManagementRequest request = GroupManagementRequest.newAdd( clientId, server.getId(), nextCallId(), group); @@ -864,4 +881,10 @@ private static List createChunkExecutors( return ImmutableList.copyOf(executors); } + /** + * @return list of default priority + */ + public static List getDefaultPriorityList() { + return DEFAULT_PRIORITY_LIST; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java index 9e22cbcce330..6fdb4ce451c5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java @@ -25,7 +25,10 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -35,10 +38,14 @@ public class CreatePipelineCommand extends SCMCommand { + private static final Integer HIGH_PRIORITY = 1; + private static final Integer LOW_PRIORITY = 0; + private final PipelineID pipelineID; private final ReplicationFactor factor; private final ReplicationType type; private final List nodelist; + private final List priorityList; public CreatePipelineCommand(final PipelineID pipelineID, final ReplicationType type, final ReplicationFactor factor, @@ -48,16 +55,49 @@ public CreatePipelineCommand(final PipelineID pipelineID, this.factor = factor; this.type = type; this.nodelist = datanodeList; + if (datanodeList.size() == + XceiverServerRatis.getDefaultPriorityList().size()) { + this.priorityList = XceiverServerRatis.getDefaultPriorityList(); + } else { + this.priorityList = + new ArrayList<>(Collections.nCopies(datanodeList.size(), 0)); + } + } + + public CreatePipelineCommand(final PipelineID pipelineID, + final ReplicationType type, final ReplicationFactor factor, + final List datanodeList, + final DatanodeDetails suggestedLeader) { + super(); + this.pipelineID = pipelineID; + this.factor = factor; + this.type = type; + this.nodelist = datanodeList; + this.priorityList = new ArrayList<>(); + initPriorityList(datanodeList, suggestedLeader); + } + + private void initPriorityList( + List dns, DatanodeDetails suggestedLeader) { + for (DatanodeDetails dn : dns) { + if (dn.equals(suggestedLeader)) { + priorityList.add(HIGH_PRIORITY); + } else { + priorityList.add(LOW_PRIORITY); + } + } } public CreatePipelineCommand(long cmdId, final PipelineID pipelineID, final ReplicationType type, final ReplicationFactor factor, - final List datanodeList) { + final List datanodeList, + final List priorityList) { super(cmdId); this.pipelineID = pipelineID; this.factor = factor; this.type = type; this.nodelist = datanodeList; + this.priorityList = priorityList; } /** @@ -80,6 +120,7 @@ public CreatePipelineCommandProto getProto() { .addAllDatanode(nodelist.stream() .map(DatanodeDetails::getProtoBufMessage) .collect(Collectors.toList())) + .addAllPriority(priorityList) .build(); } @@ -91,7 +132,8 @@ public static CreatePipelineCommand getFromProtobuf( createPipelineProto.getType(), createPipelineProto.getFactor(), createPipelineProto.getDatanodeList().stream() .map(DatanodeDetails::getFromProtoBuf) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + createPipelineProto.getPriorityList()); } public PipelineID getPipelineID() { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java index 8ee6ac7cac0a..ede0b94de476 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java @@ -48,7 +48,9 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; /** @@ -113,8 +115,11 @@ public void testPipelineCreation() throws IOException { commandHandler.handle(command, ozoneContainer, stateContext, connectionManager); + List priorityList = + new ArrayList<>(Collections.nCopies(datanodes.size(), 0)); + Mockito.verify(writeChanel, Mockito.times(1)) - .addGroup(pipelineID.getProtobuf(), datanodes); + .addGroup(pipelineID.getProtobuf(), datanodes, priorityList); Mockito.verify(raftClient, Mockito.times(2)) .groupAdd(Mockito.any(RaftGroup.class), Mockito.any(RaftPeerId.class)); diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto index 0c9b26142558..b43a74cd0679 100644 --- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto +++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto @@ -100,6 +100,7 @@ message Pipeline { optional string leaderID = 6; repeated uint32 memberOrders = 7; optional uint64 creationTimeStamp = 8; + optional UUID suggestedLeaderID = 9; // TODO(runzhiwang): when leaderID is gone, specify 6 as the index of leaderID128 optional UUID leaderID128 = 100; } diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto index 1dc4bcd4d249..4f610ff24b1a 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto @@ -380,6 +380,7 @@ message CreatePipelineCommandProto { required ReplicationFactor factor = 3; repeated DatanodeDetailsProto datanode = 4; required int64 cmdId = 5; + repeated int32 priority = 6; } /** diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto index 682d4d9c73b4..7d59bd72ef4c 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto @@ -115,6 +115,7 @@ enum Status { FAILED_TO_ALLOCATE_ENOUGH_BLOCKS = 27; INTERNAL_ERROR = 29; FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY = 30; + FAILED_TO_INIT_LEADER_CHOOSE_POLICY = 31; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index bb56a0380b1b..8bc5bd5ededd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -59,7 +59,8 @@ void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID) pipelineStateMap.addContainerToPipeline(pipelineId, containerID); } - Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException { + public Pipeline getPipeline(PipelineID pipelineID) + throws PipelineNotFoundException { return pipelineStateMap.getPipeline(pipelineID); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index e39f14126991..830db18d72e2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -31,11 +31,14 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState; +import org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicy; +import org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicyFactory; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; +import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +55,10 @@ public class RatisPipelineProvider extends PipelineProvider { private final PipelinePlacementPolicy placementPolicy; private int pipelineNumberLimit; private int maxPipelinePerDatanode; + private final LeaderChoosePolicy leaderChoosePolicy; - RatisPipelineProvider(NodeManager nodeManager, + @VisibleForTesting + public RatisPipelineProvider(NodeManager nodeManager, PipelineStateManager stateManager, ConfigurationSource conf, EventPublisher eventPublisher) { super(nodeManager, stateManager); @@ -67,6 +72,12 @@ public class RatisPipelineProvider extends PipelineProvider { String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT); this.maxPipelinePerDatanode = dnLimit == null ? 0 : Integer.parseInt(dnLimit); + try { + leaderChoosePolicy = LeaderChoosePolicyFactory + .getPolicy(conf, nodeManager, stateManager); + } catch (Exception e) { + throw new RuntimeException(e); + } } private boolean exceedPipelineNumberLimit(ReplicationFactor factor) { @@ -98,8 +109,14 @@ private boolean exceedPipelineNumberLimit(ReplicationFactor factor) { return false; } + @VisibleForTesting + public LeaderChoosePolicy getLeaderChoosePolicy() { + return leaderChoosePolicy; + } + @Override - public Pipeline create(ReplicationFactor factor) throws IOException { + public synchronized Pipeline create(ReplicationFactor factor) + throws IOException { if (exceedPipelineNumberLimit(factor)) { throw new SCMException("Ratis pipeline number meets the limit: " + pipelineNumberLimit + " factor : " + @@ -121,16 +138,22 @@ public Pipeline create(ReplicationFactor factor) throws IOException { throw new IllegalStateException("Unknown factor: " + factor.name()); } + DatanodeDetails suggestedLeader = leaderChoosePolicy.chooseLeader(dns); + Pipeline pipeline = Pipeline.newBuilder() .setId(PipelineID.randomId()) .setState(PipelineState.ALLOCATED) .setType(ReplicationType.RATIS) .setFactor(factor) .setNodes(dns) + .setSuggestedLeaderId( + suggestedLeader != null ? suggestedLeader.getUuid() : null) .build(); // Send command to datanodes to create pipeline - final CreatePipelineCommand createCommand = + final CreatePipelineCommand createCommand = suggestedLeader != null ? + new CreatePipelineCommand(pipeline.getId(), pipeline.getType(), + factor, dns, suggestedLeader) : new CreatePipelineCommand(pipeline.getId(), pipeline.getType(), factor, dns); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java new file mode 100644 index 000000000000..415cf10a2908 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; + +import java.util.List; + +/** + * The default leader choose policy. + * Do not choose leader here, so that all nodes have the same priority + * and ratis elects leader without depending on priority. + */ +public class DefaultLeaderChoosePolicy extends LeaderChoosePolicy { + + public DefaultLeaderChoosePolicy( + NodeManager nodeManager, PipelineStateManager pipelineStateManager) { + super(nodeManager, pipelineStateManager); + } + + @Override + public DatanodeDetails chooseLeader(List dns) { + return null; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java new file mode 100644 index 000000000000..04c155b356ce --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; + +import java.util.List; + +/** + * A {@link LeaderChoosePolicy} support choosing leader from datanode list. + */ +public abstract class LeaderChoosePolicy { + + private final NodeManager nodeManager; + private final PipelineStateManager pipelineStateManager; + + public LeaderChoosePolicy( + NodeManager nodeManager, PipelineStateManager pipelineStateManager) { + this.nodeManager = nodeManager; + this.pipelineStateManager = pipelineStateManager; + } + + /** + * Given an initial list of datanodes, return one of the datanodes. + * + * @param dns list of datanodes. + * @return one of the datanodes. + */ + public abstract DatanodeDetails chooseLeader(List dns); + + protected NodeManager getNodeManager() { + return nodeManager; + } + + protected PipelineStateManager getPipelineStateManager() { + return pipelineStateManager; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java new file mode 100644 index 000000000000..8e1a0ff49784 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; + +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; + +/** + * A factory to create leader choose policy instance based on configuration + * property {@link ScmConfigKeys#OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY}. + */ +public final class LeaderChoosePolicyFactory { + private static final Logger LOG = + LoggerFactory.getLogger(LeaderChoosePolicyFactory.class); + + private static final Class + OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY_DEFAULT = + MinLeaderCountChoosePolicy.class; + + private LeaderChoosePolicyFactory() { + } + + + public static LeaderChoosePolicy getPolicy( + ConfigurationSource conf, final NodeManager nodeManager, + final PipelineStateManager pipelineStateManager) throws SCMException { + final Class policyClass = conf + .getClass(ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY, + OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY_DEFAULT, + LeaderChoosePolicy.class); + Constructor constructor; + try { + constructor = policyClass.getDeclaredConstructor(NodeManager.class, + PipelineStateManager.class); + LOG.info("Create leader choose policy of type {}", + policyClass.getCanonicalName()); + } catch (NoSuchMethodException e) { + String msg = "Failed to find constructor(NodeManager, " + + "PipelineStateManager) for class " + + policyClass.getCanonicalName(); + LOG.error(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_INIT_LEADER_CHOOSE_POLICY); + } + + try { + return constructor.newInstance(nodeManager, pipelineStateManager); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate class " + + policyClass.getCanonicalName() + " for " + e.getMessage()); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java new file mode 100644 index 000000000000..d4068b9e130d --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; +import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * The minimum leader count choose policy that chooses leader + * which has the minimum exist leader count. + */ +public class MinLeaderCountChoosePolicy extends LeaderChoosePolicy { + + private static final Logger LOG = + LoggerFactory.getLogger(MinLeaderCountChoosePolicy.class); + + public MinLeaderCountChoosePolicy( + NodeManager nodeManager, PipelineStateManager pipelineStateManager) { + super(nodeManager, pipelineStateManager); + } + + @Override + public DatanodeDetails chooseLeader(List dns) { + Map suggestedLeaderCount = + getSuggestedLeaderCount( + dns, getNodeManager(), getPipelineStateManager()); + int minLeaderCount = Integer.MAX_VALUE; + DatanodeDetails suggestedLeader = null; + + for (Map.Entry entry : + suggestedLeaderCount.entrySet()) { + if (entry.getValue() < minLeaderCount) { + minLeaderCount = entry.getValue(); + suggestedLeader = entry.getKey(); + } + } + + return suggestedLeader; + } + + private Map getSuggestedLeaderCount( + List dns, NodeManager nodeManager, + PipelineStateManager pipelineStateManager) { + Map suggestedLeaderCount = new HashMap<>(); + for (DatanodeDetails dn : dns) { + suggestedLeaderCount.put(dn, 0); + + Set pipelineIDSet = nodeManager.getPipelines(dn); + for (PipelineID pipelineID : pipelineIDSet) { + try { + Pipeline pipeline = pipelineStateManager.getPipeline(pipelineID); + if (!pipeline.isClosed() + && dn.getUuid().equals(pipeline.getSuggestedLeaderId())) { + suggestedLeaderCount.put(dn, suggestedLeaderCount.get(dn) + 1); + } + } catch (PipelineNotFoundException e) { + LOG.debug("Pipeline not found in pipeline state manager : {}", + pipelineID, e); + } + } + } + + return suggestedLeaderCount; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/package-info.java new file mode 100644 index 000000000000..e29369a215d4 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; +// Various leader choosing algorithms. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java new file mode 100644 index 000000000000..53905e7f45de --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfig; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager; +import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineProvider; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +/** + * Unit tests for {@link LeaderChoosePolicy}. + */ +public class TestLeaderChoosePolicy { + private OzoneConfiguration conf; + + private ScmConfig scmConfig; + + @Before + public void setup() { + //initialize network topology instance + conf = new OzoneConfiguration(); + scmConfig = conf.getObject(ScmConfig.class); + } + + @Test + public void testDefaultPolicy() { + RatisPipelineProvider ratisPipelineProvider = new RatisPipelineProvider( + mock(NodeManager.class), + mock(PipelineStateManager.class), + conf, + mock(EventPublisher.class)); + Assert.assertSame( + ratisPipelineProvider.getLeaderChoosePolicy().getClass(), + DefaultLeaderChoosePolicy.class); + } + + @Test(expected = RuntimeException.class) + public void testClassNotImplemented() { + // set a class not implemented + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY, + "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" + + ".HelloWorld"); + new RatisPipelineProvider( + mock(NodeManager.class), + mock(PipelineStateManager.class), + conf, + mock(EventPublisher.class)); + + // expecting exception + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestLeaderChoosePolicy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestLeaderChoosePolicy.java new file mode 100644 index 000000000000..ecf1c2f05ac7 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestLeaderChoosePolicy.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + * + *      http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT; + +/** + * Tests for LeaderChoosePolicy. + */ +@Ignore +public class TestLeaderChoosePolicy { + + private static MiniOzoneCluster cluster; + private OzoneConfiguration conf = new OzoneConfiguration(); + private static PipelineManager pipelineManager; + + public void init(int numDatanodes, int datanodePipelineLimit) + throws Exception { + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, + GenericTestUtils.getRandomizedTempPath()); + conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, datanodePipelineLimit); + + cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(numDatanodes) + .setTotalPipelineNumLimit(numDatanodes + numDatanodes/3) + .setHbInterval(2000) + .setHbProcessorInterval(1000) + .build(); + cluster.waitForClusterToBeReady(); + StorageContainerManager scm = cluster.getStorageContainerManager(); + pipelineManager = scm.getPipelineManager(); + } + + @After + public void cleanup() { + cluster.shutdown(); + } + + private void checkLeaderBalance(int dnNum, int leaderNumOfEachDn) + throws Exception { + List pipelines = pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN); + + for (Pipeline pipeline : pipelines) { + LambdaTestUtils.await(30000, 500, () -> + pipeline.getLeaderId().equals(pipeline.getSuggestedLeaderId())); + } + + Map leaderCount = new HashMap<>(); + for (Pipeline pipeline : pipelines) { + UUID leader = pipeline.getLeaderId(); + if (!leaderCount.containsKey(leader)) { + leaderCount.put(leader, 0); + } + + leaderCount.put(leader, leaderCount.get(leader) + 1); + } + + Assert.assertTrue(leaderCount.size() == dnNum); + for (UUID key : leaderCount.keySet()) { + Assert.assertTrue(leaderCount.get(key) == leaderNumOfEachDn); + } + } + + @Test(timeout = 360000) + public void testRestoreSuggestedLeader() throws Exception { + conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); + conf.set(OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY, + "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" + + ".MinLeaderCountChoosePolicy"); + int dnNum = 3; + int dnPipelineLimit = 3; + int leaderNumOfEachDn = dnPipelineLimit / dnNum; + int pipelineNum = 3; + + init(dnNum, dnPipelineLimit); + // make sure two pipelines are created + waitForPipelines(pipelineNum); + // No Factor ONE pipeline is auto created. + Assert.assertEquals(0, pipelineManager.getPipelines( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE).size()); + + // pipelineNum pipelines in 3 datanodes, + // each datanode has leaderNumOfEachDn leaders after balance + checkLeaderBalance(dnNum, leaderNumOfEachDn); + List pipelinesBeforeRestart = + cluster.getStorageContainerManager().getPipelineManager() + .getPipelines(); + + cluster.restartStorageContainerManager(true); + + checkLeaderBalance(dnNum, leaderNumOfEachDn); + List pipelinesAfterRestart = + cluster.getStorageContainerManager().getPipelineManager() + .getPipelines(); + + Assert.assertEquals( + pipelinesBeforeRestart.size(), pipelinesAfterRestart.size()); + + for (Pipeline p : pipelinesBeforeRestart) { + boolean equal = false; + for (Pipeline q : pipelinesAfterRestart) { + if (p.getId().equals(q.getId()) + && p.getSuggestedLeaderId().equals(q.getSuggestedLeaderId())) { + equal = true; + } + } + + Assert.assertTrue(equal); + } + } + + @Test(timeout = 360000) + public void testMinLeaderCountChoosePolicy() throws Exception { + conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); + conf.set(OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY, + "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" + + ".MinLeaderCountChoosePolicy"); + int dnNum = 3; + int dnPipelineLimit = 3; + int leaderNumOfEachDn = dnPipelineLimit / dnNum; + int pipelineNum = 3; + + init(dnNum, dnPipelineLimit); + // make sure pipelines are created + waitForPipelines(pipelineNum); + // No Factor ONE pipeline is auto created. + Assert.assertEquals(0, pipelineManager.getPipelines( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.ONE).size()); + + // pipelineNum pipelines in 3 datanodes, + // each datanode has leaderNumOfEachDn leaders after balance + checkLeaderBalance(dnNum, leaderNumOfEachDn); + + Random r = new Random(0); + for (int i = 0; i < 10; i++) { + // destroy some pipelines, wait new pipelines created, + // then check leader balance + + List pipelines = pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN); + + int destroyNum = r.nextInt(pipelines.size()); + for (int k = 0; k <= destroyNum; k++) { + pipelineManager.finalizeAndDestroyPipeline(pipelines.get(k), false); + } + + waitForPipelines(pipelineNum); + + checkLeaderBalance(dnNum, leaderNumOfEachDn); + } + } + + @Test(timeout = 60000) + public void testDefaultLeaderChoosePolicy() throws Exception { + conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false); + conf.set(OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY, + "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" + + ".DefaultLeaderChoosePolicy"); + int dnNum = 3; + int dnPipelineLimit = 3; + int pipelineNum = 3; + + init(dnNum, dnPipelineLimit); + // make sure pipelines are created + waitForPipelines(pipelineNum); + } + + private void waitForPipelines(int numPipelines) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) + .size() >= numPipelines, 100, 60000); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java index bd677db65f65..62369000616d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java @@ -169,6 +169,6 @@ private void waitForPipelines(int numPipelines) GenericTestUtils.waitFor(() -> pipelineManager .getPipelines(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) - .size() >= numPipelines, 100, 40000); + .size() >= numPipelines, 100, 60000); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index 9a9f0c775f8c..46e3d673a3b4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -102,7 +102,7 @@ public static void afterClass() { FileUtils.deleteQuietly(READ_TMP); } - @Test(timeout = 30000) + @Test(timeout = 60000) public void testStartMultipleDatanodes() throws Exception { final int numberOfNodes = 3; cluster = MiniOzoneCluster.newBuilder(conf) @@ -290,7 +290,7 @@ private void createMalformedIDFile(File malformedFile) * Test that a DN can register with SCM even if it was started before the SCM. * @throws Exception */ - @Test (timeout = 60000) + @Test (timeout = 100000) public void testDNstartAfterSCM() throws Exception { // Start a cluster with 3 DN cluster = MiniOzoneCluster.newBuilder(conf) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java index 57c70613b47b..5ddde8a3a026 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java @@ -42,7 +42,7 @@ public class TestOzoneManagerRocksDBLogging { private RocksDBConfiguration dbConf; @Rule - public Timeout timeout = new Timeout(60000); + public Timeout timeout = new Timeout(100000); private static GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.captureLogs(DBStoreBuilder.ROCKS_DB_LOGGER);