Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -106,6 +107,11 @@ public static RaftPeer toRaftPeer(DatanodeDetails id) {
return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id));
}

public static RaftPeer toRaftPeer(DatanodeDetails id, int priority) {
return new RaftPeer(
toRaftPeerId(id), toRaftPeerAddressString(id), priority);
}

private static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
return toRaftPeers(pipeline.getNodes());
}
Expand All @@ -125,6 +131,19 @@ private static RaftGroup newRaftGroup(Collection<RaftPeer> peers) {
: RaftGroup.valueOf(DUMMY_GROUP_ID, peers);
}

public static RaftGroup newRaftGroup(RaftGroupId groupId,
List<DatanodeDetails> peers, List<Integer> priorityList) {
assert peers.size() == priorityList.size();

final List<RaftPeer> newPeers = new ArrayList<>();
for (int i = 0; i < peers.size(); i++) {
RaftPeer peer = RatisHelper.toRaftPeer(peers.get(i), priorityList.get(i));
newPeers.add(peer);
}
return peers.isEmpty() ? RaftGroup.valueOf(groupId, Collections.emptyList())
: RaftGroup.valueOf(groupId, newPeers);
}

public static RaftGroup newRaftGroup(RaftGroupId groupId,
Collection<DatanodeDetails> peers) {
final List<RaftPeer> newPeers = peers.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT =
"ozone.scm.pipeline.allocated.timeout";

public static final String OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY =
"ozone.scm.pipeline.leader-choose.policy";

public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT =
"5m";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public enum ResultCodes {
FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY,
FAILED_TO_ALLOCATE_ENOUGH_BLOCKS,
INTERNAL_ERROR,
FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY
FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY,
FAILED_TO_INIT_LEADER_CHOOSE_POLICY
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public final class Pipeline {
private UUID leaderId;
// Timestamp for pipeline upon creation
private Instant creationTimestamp;
// suggested leader id with high priority
private final UUID suggestedLeaderId;

/**
* The immutable properties of pipeline object is used in
Expand All @@ -69,13 +71,14 @@ public final class Pipeline {
*/
private Pipeline(PipelineID id, ReplicationType type,
ReplicationFactor factor, PipelineState state,
Map<DatanodeDetails, Long> nodeStatus) {
Map<DatanodeDetails, Long> nodeStatus, UUID suggestedLeaderId) {
this.id = id;
this.type = type;
this.factor = factor;
this.state = state;
this.nodeStatus = nodeStatus;
this.creationTimestamp = Instant.now();
this.suggestedLeaderId = suggestedLeaderId;
}

/**
Expand Down Expand Up @@ -123,6 +126,16 @@ public Instant getCreationTimestamp() {
return creationTimestamp;
}

/**
* Return the suggested leaderId which has a high priority among DNs of the
* pipeline.
*
* @return Suggested LeaderId
*/
public UUID getSuggestedLeaderId() {
return suggestedLeaderId;
}

/**
* Set the creation timestamp. Only for protobuf now.
*
Expand Down Expand Up @@ -278,6 +291,14 @@ public HddsProtos.Pipeline getProtobufMessage()
builder.setLeaderID128(uuid128);
}

if (suggestedLeaderId != null) {
HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
.setMostSigBits(suggestedLeaderId.getMostSignificantBits())
.setLeastSigBits(suggestedLeaderId.getLeastSignificantBits())
.build();
builder.setSuggestedLeaderID(uuid128);
}

// To save the message size on wire, only transfer the node order based on
// network topology
List<DatanodeDetails> nodes = nodesInOrder.get();
Expand Down Expand Up @@ -315,12 +336,20 @@ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
leaderId = UUID.fromString(pipeline.getLeaderID());
}

UUID suggestedLeaderId = null;
if (pipeline.hasSuggestedLeaderID()) {
HddsProtos.UUID uuid = pipeline.getSuggestedLeaderID();
suggestedLeaderId =
new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
}

return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId()))
.setFactor(pipeline.getFactor())
.setType(pipeline.getType())
.setState(PipelineState.fromProtobuf(pipeline.getState()))
.setNodes(nodes)
.setLeaderId(leaderId)
.setSuggestedLeaderId(suggestedLeaderId)
.setNodesInOrder(pipeline.getMemberOrdersList())
.setCreateTimestamp(pipeline.getCreationTimeStamp())
.build();
Expand Down Expand Up @@ -392,6 +421,7 @@ public static class Builder {
private List<DatanodeDetails> nodesInOrder = null;
private UUID leaderId = null;
private Instant creationTimestamp = null;
private UUID suggestedLeaderId = null;

public Builder() {}

Expand All @@ -404,6 +434,7 @@ public Builder(Pipeline pipeline) {
this.nodesInOrder = pipeline.nodesInOrder.get();
this.leaderId = pipeline.getLeaderId();
this.creationTimestamp = pipeline.getCreationTimestamp();
this.suggestedLeaderId = pipeline.getSuggestedLeaderId();
}

public Builder setId(PipelineID id1) {
Expand Down Expand Up @@ -447,13 +478,19 @@ public Builder setCreateTimestamp(long createTimestamp) {
return this;
}

public Builder setSuggestedLeaderId(UUID uuid) {
this.suggestedLeaderId = uuid;
return this;
}

public Pipeline build() {
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(type);
Preconditions.checkNotNull(factor);
Preconditions.checkNotNull(state);
Preconditions.checkNotNull(nodeStatus);
Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
Pipeline pipeline =
new Pipeline(id, type, factor, state, nodeStatus, suggestedLeaderId);
pipeline.setLeaderId(leaderId);
// overwrite with original creationTimestamp
if (creationTimestamp != null) {
Expand Down Expand Up @@ -484,6 +521,7 @@ public Pipeline build() {
// This branch is for pipeline clone
pipeline.setNodesInOrder(nodesInOrder);
}

return pipeline;
}
}
Expand Down
18 changes: 18 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,24 @@
stage until it gets scrubbed.
</description>
</property>
<property>
<name>ozone.scm.pipeline.leader-choose.policy</name>
<value>
org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.DefaultLeaderChoosePolicy
</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>
The policy used for choosing desired leader for pipeline creation.
There are two policies supporting now: DefaultLeaderChoosePolicy, MinLeaderCountChoosePolicy.
org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.DefaultLeaderChoosePolicy
implements a policy that choose leader without depending on priority.
org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.MinLeaderCountChoosePolicy
implements a policy that choose leader which has the minimum exist leader count.
In the future, we need to add policies which consider:
1. resource, the datanode with the most abundant cpu and memory can be made the leader
2. topology, the datanode nearest to the client can be made the leader
</description>
</property>
<property>
<name>ozone.scm.container.size</name>
<value>5GB</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
Expand Down Expand Up @@ -82,18 +82,20 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
final CreatePipelineCommandProto createCommand =
((CreatePipelineCommand)command).getProto();
final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID();
final Collection<DatanodeDetails> peers =
final List<DatanodeDetails> peers =
createCommand.getDatanodeList().stream()
.map(DatanodeDetails::getFromProtoBuf)
.collect(Collectors.toList());
final List<Integer> priorityList = createCommand.getPriorityList();

try {
XceiverServerSpi server = ozoneContainer.getWriteChannel();
if (!server.isExist(pipelineID)) {
final RaftGroupId groupId = RaftGroupId.valueOf(
PipelineID.getFromProtobuf(pipelineID).getId());
final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers);
server.addGroup(pipelineID, peers);
final RaftGroup group =
RatisHelper.newRaftGroup(groupId, peers, priorityList);
server.addGroup(pipelineID, peers, priorityList);
peers.stream().filter(
d -> !d.getUuid().equals(dn.getUuid()))
.forEach(d -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
import java.io.IOException;
import java.util.Collection;
import java.util.List;

/** A server endpoint that acts as the communication layer for Ozone
Expand Down Expand Up @@ -68,9 +67,16 @@ void submitRequest(ContainerCommandRequestProto request,
* Join a new pipeline.
*/
default void addGroup(HddsProtos.PipelineID pipelineId,
Collection<DatanodeDetails> peers) throws IOException {
List<DatanodeDetails> peers) throws IOException {
}

/**
* Join a new pipeline with priority.
*/
default void addGroup(HddsProtos.PipelineID pipelineId,
List<DatanodeDetails> peers,
List<Integer> priorityList) throws IOException {
}

/**
* Exit a pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -115,6 +116,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
private static final Logger LOG = LoggerFactory
.getLogger(XceiverServerRatis.class);
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
private static final List<Integer> DEFAULT_PRIORITY_LIST =
new ArrayList<>(
Collections.nCopies(HddsProtos.ReplicationFactor.THREE_VALUE, 0));

private static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
Expand Down Expand Up @@ -711,10 +715,23 @@ public List<PipelineID> getPipelineIds() {

@Override
public void addGroup(HddsProtos.PipelineID pipelineId,
Collection<DatanodeDetails> peers) throws IOException {
List<DatanodeDetails> peers) throws IOException {
if (peers.size() == getDefaultPriorityList().size()) {
addGroup(pipelineId, peers, getDefaultPriorityList());
} else {
addGroup(pipelineId, peers,
new ArrayList<>(Collections.nCopies(peers.size(), 0)));
}
}

@Override
public void addGroup(HddsProtos.PipelineID pipelineId,
List<DatanodeDetails> peers,
List<Integer> priorityList) throws IOException {
final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId);
final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers);
final RaftGroup group =
RatisHelper.newRaftGroup(groupId, peers, priorityList);
GroupManagementRequest request = GroupManagementRequest.newAdd(
clientId, server.getId(), nextCallId(), group);

Expand Down Expand Up @@ -864,4 +881,10 @@ private static List<ThreadPoolExecutor> createChunkExecutors(
return ImmutableList.copyOf(executors);
}

/**
* @return list of default priority
*/
public static List<Integer> getDefaultPriorityList() {
return DEFAULT_PRIORITY_LIST;
}
}
Loading