Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -31,6 +32,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -80,6 +82,18 @@ public class StateContext {
private boolean shutdownGracefully = false;
private final AtomicLong threadPoolNotAvailableCount;

/**
* term of latest leader SCM, extract from SCMCommand.
*
* Only leader SCM (both latest and stale) can send out SCMCommand,
* which will save its term in SCMCommand. Since latest leader SCM
* always has the highest term, term can be used to detect SCMCommand
* from stale leader SCM.
*
* For non-HA mode, term of SCMCommand will be 0.
*/
private Optional<Long> termOfLeaderSCM = Optional.empty();

/**
* Starting with a 2 sec heartbeat frequency which will be updated to the
* real HB frequency after scm registration. With this method the
Expand Down Expand Up @@ -470,6 +484,65 @@ public void execute(ExecutorService service, long time, TimeUnit unit)
}
}

/**
* After startup, datanode needs detect latest leader SCM before handling
* any SCMCommand, so that it won't be disturbed by stale leader SCM.
*
* The rule is: after majority SCMs are in HEARTBEAT state and has
* heard from leader SCMs (commandQueue is not empty), datanode will init
* termOfLeaderSCM with the max term found in commandQueue.
*
* The init process also works for non-HA mode. In that case, term of all
* SCMCommands will be 0.
*/
private void initTermOfLeaderSCM() {
// only init once
if (termOfLeaderSCM.isPresent()) {
return;
}

AtomicInteger scmNum = new AtomicInteger(0);
AtomicInteger activeScmNum = new AtomicInteger(0);

getParent().getConnectionManager().getValues()
.forEach(endpoint -> {
if (endpoint.isPassive()) {
return;
}
scmNum.incrementAndGet();
if (endpoint.getState()
== EndpointStateMachine.EndPointStates.HEARTBEAT) {
activeScmNum.incrementAndGet();
}
});

// majority SCMs should be in HEARTBEAT state.
if (activeScmNum.get() < scmNum.get() / 2 + 1) {
return;
}

// if commandQueue is not empty, init termOfLeaderSCM
// with the largest term found in commandQueue
commandQueue.stream()
.mapToLong(SCMCommand::getTerm)
.max()
.ifPresent(term -> termOfLeaderSCM = Optional.of(term));
}

/**
* monotonically increase termOfLeaderSCM.
* Always record the latest term that has seen.
*/
private void updateTermOfLeaderSCM(SCMCommand<?> command) {
if (!termOfLeaderSCM.isPresent()) {
LOG.error("should init termOfLeaderSCM before update it.");
return;
}

termOfLeaderSCM = Optional.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use AtomicLong for termOfLeaderSCM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Point.
Since this is executed in a single thread, let's do this first. We surely need to consider the thread safety in future change.

Long.max(termOfLeaderSCM.get(), command.getTerm()));
}

/**
* Returns the next command or null if it is empty.
*
Expand All @@ -478,7 +551,26 @@ public void execute(ExecutorService service, long time, TimeUnit unit)
public SCMCommand getNextCommand() {
lock.lock();
try {
return commandQueue.poll();
initTermOfLeaderSCM();
if (!termOfLeaderSCM.isPresent()) {
return null; // not ready yet
}

while (true) {
SCMCommand<?> command = commandQueue.poll();
if (command == null) {
return null;
}

updateTermOfLeaderSCM(command);
if (command.getTerm() == termOfLeaderSCM.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused on when termOfLeaderSCM is updated to the newest leader term. Is termOfLeaderSCM updated in during leader selection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean whether termOfLeaderSCM is updated in leader election of SCM ? No, it won't. Datanode detects the latest SCM term by heartbeat with SCMs, whose interval is larger than 30s.

return command;
}

LOG.warn("Detect and drop a SCMCommand {} from stale leader SCM," +
" stale term {}, latest term {}.",
command, command.getTerm(), termOfLeaderSCM.get());
}
} finally {
lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ private void processResponse(SCMHeartbeatResponseProto response,
DeleteBlocksCommand db = DeleteBlocksCommand
.getFromProtobuf(
commandResponseProto.getDeleteBlocksCommandProto());
if (commandResponseProto.hasTerm()) {
db.setTerm(commandResponseProto.getTerm());
}
if (!db.blocksTobeDeleted().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(DeletedContainerBlocksSummary
Expand All @@ -285,6 +288,9 @@ private void processResponse(SCMHeartbeatResponseProto response,
CloseContainerCommand closeContainer =
CloseContainerCommand.getFromProtobuf(
commandResponseProto.getCloseContainerCommandProto());
if (commandResponseProto.hasTerm()) {
closeContainer.setTerm(commandResponseProto.getTerm());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM container close request for container {}",
closeContainer.getContainerID());
Expand All @@ -295,6 +301,9 @@ private void processResponse(SCMHeartbeatResponseProto response,
ReplicateContainerCommand replicateContainerCommand =
ReplicateContainerCommand.getFromProtobuf(
commandResponseProto.getReplicateContainerCommandProto());
if (commandResponseProto.hasTerm()) {
replicateContainerCommand.setTerm(commandResponseProto.getTerm());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM container replicate request for container {}",
replicateContainerCommand.getContainerID());
Expand All @@ -305,6 +314,9 @@ private void processResponse(SCMHeartbeatResponseProto response,
DeleteContainerCommand deleteContainerCommand =
DeleteContainerCommand.getFromProtobuf(
commandResponseProto.getDeleteContainerCommandProto());
if (commandResponseProto.hasTerm()) {
deleteContainerCommand.setTerm(commandResponseProto.getTerm());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM delete container request for container {}",
deleteContainerCommand.getContainerID());
Expand All @@ -315,6 +327,9 @@ private void processResponse(SCMHeartbeatResponseProto response,
CreatePipelineCommand createPipelineCommand =
CreatePipelineCommand.getFromProtobuf(
commandResponseProto.getCreatePipelineCommandProto());
if (commandResponseProto.hasTerm()) {
createPipelineCommand.setTerm(commandResponseProto.getTerm());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM create pipeline request {}",
createPipelineCommand.getPipelineID());
Expand All @@ -325,6 +340,9 @@ private void processResponse(SCMHeartbeatResponseProto response,
ClosePipelineCommand closePipelineCommand =
ClosePipelineCommand.getFromProtobuf(
commandResponseProto.getClosePipelineCommandProto());
if (commandResponseProto.hasTerm()) {
closePipelineCommand.setTerm(commandResponseProto.getTerm());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Received SCM close pipeline request {}",
closePipelineCommand.getPipelineID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.apache.hadoop.hdds.conf.ConfigurationSource;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class OzoneContainer {
private List<ContainerDataScanner> dataScanners;
private final BlockDeletingService blockDeletingService;
private final GrpcTlsConfig tlsClientConfig;
private final AtomicBoolean isStarted;

/**
* Construct OzoneContainer object.
Expand Down Expand Up @@ -152,6 +154,8 @@ public OzoneContainer(DatanodeDetails datanodeDetails, ConfigurationSource
TimeUnit.MILLISECONDS, config);
tlsClientConfig = RatisHelper.createTlsClientConfig(
secConf, certClient != null ? certClient.getCACertificate() : null);

isStarted = new AtomicBoolean(false);
}

public GrpcTlsConfig getTlsClientConfig() {
Expand Down Expand Up @@ -240,6 +244,10 @@ private void stopContainerScrub() {
* @throws IOException
*/
public void start(String scmId) throws IOException {
if (!isStarted.compareAndSet(false, true)) {
LOG.info("Ignore. OzoneContainer already started.");
return;
}
LOG.info("Attempting to start container services.");
startContainerScrub();
writeChannel.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@
*/
public abstract class SCMCommand<T extends GeneratedMessage> implements
IdentifiableEventPayload {
private long id;
private final long id;

// Under HA mode, holds term of underlying RaftServer iff current
// SCM is a leader, otherwise, holds term 0.
// Notes that, the first elected leader is from term 1, term 0,
// as the initial value of currentTerm, is never used under HA mode.
private long term = 0;

SCMCommand() {
this.id = HddsIdFactory.getLongId();
Expand Down Expand Up @@ -59,4 +65,18 @@ public long getId() {
return id;
}

/**
* Get term of this command.
* @return term
*/
public long getTerm() {
return term;
}

/**
* Set term of this command.
*/
public void setTerm(long term) {
this.term = term;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,12 @@ message SCMCommandProto {
optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
optional CreatePipelineCommandProto createPipelineCommandProto = 7;
optional ClosePipelineCommandProto closePipelineCommandProto = 8;

// Under HA mode, holds term of underlying RaftServer iff current
// SCM is a leader, otherwise, holds term 0.
// Notes that, the first elected leader is from term 1, term 0,
// as the initial value of currentTerm, is never used under HA mode.
optional uint64 term = 15;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.ratis.protocol.exceptions.NotLeaderException;

import java.io.IOException;
import java.util.Optional;

/**
* SCMHAManager provides HA service for SCM.
Expand All @@ -34,9 +35,13 @@ public interface SCMHAManager {
void start() throws IOException;

/**
* Returns true if the current SCM is the leader.
* For HA mode, return an Optional that holds term of the
* underlying RaftServer iff current SCM is in leader role.
* Otherwise, return an empty optional.
*
* For non-HA mode, return an Optional that holds term 0.
*/
boolean isLeader();
Optional<Long> isLeader();

/**
* Returns RatisServer instance associated with the SCM instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
Expand All @@ -32,6 +33,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;

/**
* SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
Expand Down Expand Up @@ -70,29 +72,28 @@ public void start() throws IOException {
* {@inheritDoc}
*/
@Override
public boolean isLeader() {
public Optional<Long> isLeader() {
if (!SCMHAUtils.isSCMHAEnabled(conf)) {
// When SCM HA is not enabled, the current SCM is always the leader.
return true;
return Optional.of((long)0);
}
RaftServer server = ratisServer.getServer();
Preconditions.checkState(server instanceof RaftServerProxy);
RaftServerImpl serverImpl = null;
try {
// SCM only has one raft group.
serverImpl = ((RaftServerProxy) server)
RaftServerImpl serverImpl = ((RaftServerProxy) server)
.getImpl(ratisServer.getRaftGroupId());
if (serverImpl != null) {
// Only when it's sure the current SCM is the leader, otherwise
// it should all return false.
return serverImpl.isLeader();
RaftProtos.RoleInfoProto roleInfoProto = serverImpl.getRoleInfoProto();
return roleInfoProto.hasLeaderInfo()
? Optional.of(roleInfoProto.getLeaderInfo().getTerm())
: Optional.empty();
}
} catch (IOException ioe) {
LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
"whether it's leader. ", ioe);
}

return false;
return Optional.empty();
}

/**
Expand All @@ -104,11 +105,6 @@ public SCMRatisServer getRatisServer() {
}

private RaftPeerId getPeerIdFromRoleInfo(RaftServerImpl serverImpl) {
/*
TODO: Fix Me
Ratis API has changed.
RaftServerImpl#getRoleInfoProto is no more public.

if (serverImpl.isLeader()) {
return RaftPeerId.getRaftPeerId(
serverImpl.getRoleInfoProto().getLeaderInfo().toString());
Expand All @@ -119,8 +115,6 @@ private RaftPeerId getPeerIdFromRoleInfo(RaftServerImpl serverImpl) {
} else {
return null;
}
*/
return null;
}

@Override
Expand Down
Loading