Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@

package org.apache.hadoop.hdds.scm.pipeline;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;

/**
* Represents a group of datanodes which store a container.
Expand All @@ -50,6 +53,8 @@ public final class Pipeline {
private Map<DatanodeDetails, Long> nodeStatus;
// nodes with ordered distance to client
private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
// Current reported Leader for the pipeline
private UUID leaderId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: We can make it strongly typed by making the type as DatanodeDetails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if I do that, in order to construct Pipeline from proto, I have to lookup DatanodeDetails based on UUID, which is a map in NodeManager. Not sure why making this into DatanodeDetails is better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just that it will be explicit what this UUID represent.


/**
* The immutable properties of pipeline object is used in
Expand Down Expand Up @@ -102,6 +107,22 @@ public PipelineState getPipelineState() {
return state;
}

/**
* Return the pipeline leader's UUID.
*
* @return DatanodeDetails.UUID.
*/
public UUID getLeaderId() {
return leaderId;
}

/**
* Pipeline object, outside of letting leader id to be set, is immutable.
*/
void setLeaderId(UUID leaderId) {
this.leaderId = leaderId;
}

/**
* Returns the list of nodes which form this pipeline.
*
Expand Down Expand Up @@ -154,13 +175,13 @@ void reportDatanode(DatanodeDetails dn) throws IOException {
nodeStatus.put(dn, System.currentTimeMillis());
}

boolean isHealthy() {
public boolean isHealthy() {
for (Long reportedTime : nodeStatus.values()) {
if (reportedTime < 0) {
return false;
}
}
return true;
return leaderId != null;
}

public boolean isEmpty() {
Expand All @@ -174,7 +195,7 @@ public HddsProtos.Pipeline getProtobufMessage()
.setType(type)
.setFactor(factor)
.setState(PipelineState.getProtobuf(state))
.setLeaderID("")
.setLeaderID(leaderId != null ? leaderId.toString() : "")
.addAllMembers(nodeStatus.keySet().stream()
.map(DatanodeDetails::getProtoBufMessage)
.collect(Collectors.toList()));
Expand Down Expand Up @@ -206,6 +227,8 @@ public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
.setFactor(pipeline.getFactor())
.setType(pipeline.getType())
.setState(PipelineState.fromProtobuf(pipeline.getState()))
.setLeaderId(StringUtils.isNotEmpty(pipeline.getLeaderID()) ?
UUID.fromString(pipeline.getLeaderID()) : null)
.setNodes(pipeline.getMembersList().stream()
.map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList()))
.setNodesInOrder(pipeline.getMemberOrdersList())
Expand Down Expand Up @@ -274,6 +297,7 @@ public static class Builder {
private Map<DatanodeDetails, Long> nodeStatus = null;
private List<Integer> nodeOrder = null;
private List<DatanodeDetails> nodesInOrder = null;
private UUID leaderId = null;

public Builder() {}

Expand Down Expand Up @@ -306,6 +330,11 @@ public Builder setState(PipelineState state1) {
return this;
}

public Builder setLeaderId(UUID leaderId1) {
this.leaderId = leaderId1;
return this;
}

public Builder setNodes(List<DatanodeDetails> nodes) {
this.nodeStatus = new LinkedHashMap<>();
nodes.forEach(node -> nodeStatus.put(node, -1L));
Expand All @@ -324,6 +353,7 @@ public Pipeline build() {
Preconditions.checkNotNull(state);
Preconditions.checkNotNull(nodeStatus);
Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
pipeline.setLeaderId(leaderId);

if (nodeOrder != null && !nodeOrder.isEmpty()) {
// This branch is for build from ProtoBuf
Expand Down
12 changes: 6 additions & 6 deletions hadoop-hdds/common/src/main/proto/hdds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ enum PipelineState {
}

message Pipeline {
required string leaderID = 1;
repeated DatanodeDetailsProto members = 2;
repeated DatanodeDetailsProto members = 1;
// TODO: remove the state and leaderID from this class
optional PipelineState state = 3 [default = PIPELINE_ALLOCATED];
optional ReplicationType type = 4 [default = STAND_ALONE];
optional ReplicationFactor factor = 5 [default = ONE];
required PipelineID id = 6;
optional PipelineState state = 2 [default = PIPELINE_ALLOCATED];
optional ReplicationType type = 3 [default = STAND_ALONE];
optional ReplicationFactor factor = 4 [default = ONE];
required PipelineID id = 5;
optional string leaderID = 6;
Comment on lines -70 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no real change here, can we revert it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The leaderID was marked as optional.

repeated uint32 memberOrders = 7;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@

/**
* Publishes Pipeline which will be sent to SCM as part of heartbeat.
* PipelineReport consist of the following information about each containers:
* PipelineReport consist of the following information about each pipeline:
* - pipelineID
*
* - leaderID
*/
public class PipelineReportPublisher extends
ReportPublisher<PipelineReportsProto> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerProxy;
Expand Down Expand Up @@ -854,4 +856,10 @@ public void close() throws IOException {
executor.shutdown();
}
}

@Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
RaftPeerId raftPeerId) {
ratisServer.handleLeaderChangedNotification(groupMemberId, raftPeerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;

import com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -68,12 +69,14 @@
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Collections;
import java.util.Set;
import java.util.UUID;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -107,6 +110,9 @@ private static long nextCallId() {
// TODO: Remove the gids set when Ratis supports an api to query active
// pipelines
private final Set<RaftGroupId> raftGids = new HashSet<>();
private final RaftPeerId raftPeerId;
// pipelines for which I am the leader
private Map<RaftGroupId, Boolean> groupLeaderMap = new ConcurrentHashMap<>();

private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, ContainerController containerController,
Expand Down Expand Up @@ -136,9 +142,10 @@ private XceiverServerRatis(DatanodeDetails dd, int port,
TimeUnit.MILLISECONDS);
this.dispatcher = dispatcher;
this.containerController = containerController;
this.raftPeerId = RatisHelper.toRaftPeerId(dd);

RaftServer.Builder builder =
RaftServer.newBuilder().setServerId(RatisHelper.toRaftPeerId(dd))
RaftServer.newBuilder().setServerId(raftPeerId)
.setProperties(serverProperties)
.setStateMachineRegistry(this::getStateMachine);
if (tlsConfig != null) {
Expand Down Expand Up @@ -593,6 +600,7 @@ public List<PipelineReport> getPipelineReport() {
for (RaftGroupId groupId : gids) {
reports.add(PipelineReport.newBuilder()
.setPipelineID(PipelineID.valueOf(groupId.getUuid()).getProtobuf())
.setIsLeader(groupLeaderMap.getOrDefault(groupId, Boolean.FALSE))
.build());
}
return reports;
Expand Down Expand Up @@ -676,9 +684,26 @@ public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException {

void notifyGroupRemove(RaftGroupId gid) {
raftGids.remove(gid);
// Remove any entries for group leader map
groupLeaderMap.remove(gid);
}

void notifyGroupAdd(RaftGroupId gid) {
raftGids.add(gid);
}

void handleLeaderChangedNotification(RaftGroupMemberId groupMemberId,
RaftPeerId raftPeerId1) {
LOG.info("Leader change notification received for group: {} with new " +
"leaderId: {}", groupMemberId.getGroupId(), raftPeerId1);
// Save the reported leader to be sent with the report to SCM
boolean leaderForGroup = this.raftPeerId.equals(raftPeerId1);
groupLeaderMap.put(groupMemberId.getGroupId(), leaderForGroup);
if (context != null && leaderForGroup) {
// Publish new report from leader
context.addReport(context.getParent().getContainer().getPipelineReport());
// Trigger HB immediately
context.getParent().triggerHeartbeat();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ message ContainerAction {

message PipelineReport {
required PipelineID pipelineID = 1;
required bool isLeader = 2;
}

message PipelineReportsProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,32 @@

package org.apache.hadoop.hdds.scm.pipeline;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Objects;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Objects;
import com.google.common.base.Preconditions;

/**
* Handles Pipeline Reports from datanode.
*/
public class PipelineReportHandler implements
EventHandler<PipelineReportFromDatanode> {

private static final Logger LOGGER = LoggerFactory
.getLogger(PipelineReportHandler.class);
private static final Logger LOGGER = LoggerFactory.getLogger(
PipelineReportHandler.class);
private final PipelineManager pipelineManager;
private final Configuration conf;
private final SCMSafeModeManager scmSafeModeManager;
Expand All @@ -62,7 +60,6 @@ public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager,
this.pipelineAvailabilityCheck = conf.getBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT);

}

@Override
Expand All @@ -72,8 +69,8 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
DatanodeDetails dn = pipelineReportFromDatanode.getDatanodeDetails();
PipelineReportsProto pipelineReport =
pipelineReportFromDatanode.getReport();
Preconditions.checkNotNull(dn, "Pipeline Report is "
+ "missing DatanodeDetails.");
Preconditions.checkNotNull(dn,
"Pipeline Report is missing DatanodeDetails.");
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Processing pipeline report for dn: {}", dn);
}
Expand All @@ -89,7 +86,6 @@ public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
pipelineReportFromDatanode);
}

}

private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
Expand All @@ -104,16 +100,24 @@ private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
return;
}

pipeline.reportDatanode(dn);
if (report.getIsLeader()) {
pipeline.setLeaderId(dn.getUuid());
}
if ((pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED)
&& pipeline.isHealthy()) {
pipelineManager.openPipeline(pipelineID);
}

if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
LOGGER.info("Pipeline {} reported by {}", pipeline.getId(), dn);
pipeline.reportDatanode(dn);
if (pipeline.isHealthy()) {
// if all the dns have reported, pipeline can be moved to OPEN state


if (report.getIsLeader()) {
// Pipeline reported as the leader
pipeline.setLeaderId(dn.getUuid());
pipelineManager.openPipeline(pipelineID);
}
} else {
// In OPEN state case just report the datanode
pipeline.reportDatanode(dn);
}
pipeline.reportDatanode(dn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ class PipelineStateManager {

void addPipeline(Pipeline pipeline) throws IOException {
pipelineStateMap.addPipeline(pipeline);
if (pipeline.getPipelineState() == PipelineState.OPEN) {
LOG.info("Created pipeline " + pipeline);
}
LOG.info("Created pipeline " + pipeline);
}

void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID)
Expand Down Expand Up @@ -131,8 +129,8 @@ Pipeline openPipeline(PipelineID pipelineId) throws IOException {
throw new IOException("Closed pipeline can not be opened");
}
if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
pipeline = pipelineStateMap
.updatePipelineState(pipelineId, PipelineState.OPEN);
pipeline = pipelineStateMap.updatePipelineState(
pipelineId, PipelineState.OPEN);
LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
}
return pipeline;
Expand Down
Loading