Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;
Expand Down Expand Up @@ -77,11 +76,6 @@ public void setIsLeader(boolean isLeader) {
this.isLeader = isLeader;
}

@Override
public RaftPeer getSuggestedLeader() {
throw new UnsupportedOperationException();
}

/**
* {@inheritDoc}
*/
Expand All @@ -98,24 +92,6 @@ public void shutdown() throws IOException {
ratisServer.stop();
}

@Override
public List<String> getRatisRoles() {
return Arrays.asList(
"180.3.14.5:9865",
"180.3.14.21:9865",
"180.3.14.145:9865");
}

/**
* {@inheritDoc}
*/
@Override
public NotLeaderException triggerNotLeaderException() {
return new NotLeaderException(RaftGroupMemberId.valueOf(
RaftPeerId.valueOf("peer"), RaftGroupId.randomId()),
null, new ArrayList<>());
}

private class MockRatisServer implements SCMRatisServer {

private Map<RequestType, Object> handlers =
Expand Down Expand Up @@ -205,23 +181,27 @@ private Message process(final SCMRatisRequest request)
}

@Override
public RaftServer getServer() {
return null;
public void stop() {
}

@Override
public RaftGroupId getRaftGroupId() {
public RaftServer.Division getDivision() {
return null;
}

@Override
public List<RaftPeer> getRaftPeers() {
return new ArrayList<>();
public List<String> getRatisRoles() {
return Arrays.asList(
"180.3.14.5:9865",
"180.3.14.21:9865",
"180.3.14.145:9865");
}

@Override
public void stop() {
public NotLeaderException triggerNotLeaderException() {
return new NotLeaderException(RaftGroupMemberId.valueOf(
RaftPeerId.valueOf("peer"), RaftGroupId.randomId()),
null, new ArrayList<>());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.hadoop.hdds.scm.ha;

import java.util.List;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.NotLeaderException;

import java.io.IOException;
import java.util.Optional;

Expand Down Expand Up @@ -48,23 +44,8 @@ public interface SCMHAManager {
*/
SCMRatisServer getRatisServer();

/**
* Returns suggested leader from RaftServer.
*/
RaftPeer getSuggestedLeader();

/**
* Stops the HA service.
*/
void shutdown() throws IOException;

/**
* Returns roles of ratis peers.
*/
List<String> getRatisRoles();

/**
* Returns NotLeaderException with useful info.
*/
NotLeaderException triggerNotLeaderException();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,8 @@
package org.apache.hadoop.hdds.scm.ha;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
Expand Down Expand Up @@ -77,13 +71,14 @@ public Optional<Long> isLeader() {
// When SCM HA is not enabled, the current SCM is always the leader.
return Optional.of((long)0);
}
RaftServer server = ratisServer.getServer();
RaftServer server = ratisServer.getDivision().getRaftServer();
Preconditions.checkState(server instanceof RaftServerProxy);
try {
// SCM only has one raft group.
RaftServerImpl serverImpl = ((RaftServerProxy) server)
.getImpl(ratisServer.getRaftGroupId());
.getImpl(ratisServer.getDivision().getGroup().getGroupId());
if (serverImpl != null) {
// TODO: getRoleInfoProto() will be exposed from Division later.
RaftProtos.RoleInfoProto roleInfoProto = serverImpl.getRoleInfoProto();
return roleInfoProto.hasLeaderInfo()
? Optional.of(roleInfoProto.getLeaderInfo().getTerm())
Expand All @@ -104,68 +99,11 @@ public SCMRatisServer getRatisServer() {
return ratisServer;
}

private RaftPeerId getPeerIdFromRoleInfo(RaftServerImpl serverImpl) {
if (serverImpl.isLeader()) {
return RaftPeerId.getRaftPeerId(
serverImpl.getRoleInfoProto().getLeaderInfo().toString());
} else if (serverImpl.isFollower()) {
return RaftPeerId.valueOf(
serverImpl.getRoleInfoProto().getFollowerInfo()
.getLeaderInfo().getId().getId());
} else {
return null;
}
}

@Override
public RaftPeer getSuggestedLeader() {
RaftServer server = ratisServer.getServer();
Preconditions.checkState(server instanceof RaftServerProxy);
RaftServerImpl serverImpl = null;
try {
// SCM only has one raft group.
serverImpl = ((RaftServerProxy) server)
.getImpl(ratisServer.getRaftGroupId());
if (serverImpl != null) {
RaftPeerId peerId = getPeerIdFromRoleInfo(serverImpl);
if (peerId != null) {
return RaftPeer.newBuilder().setId(peerId).build();
}
return null;
}
} catch (IOException ioe) {
LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
"whether it's leader. ", ioe);
}
return null;
}

/**
* {@inheritDoc}
*/
@Override
public void shutdown() throws IOException {
ratisServer.stop();
}

@Override
public List<String> getRatisRoles() {
return getRatisServer()
.getRaftPeers()
.stream()
.map(peer -> peer.getAddress() == null ? "" : peer.getAddress())
.collect(Collectors.toList());
}

/**
* {@inheritDoc}
*/
@Override
public NotLeaderException triggerNotLeaderException() {
return new NotLeaderException(RaftGroupMemberId.valueOf(
ratisServer.getServer().getId(),
ratisServer.getRaftGroupId()),
getSuggestedLeader(),
ratisServer.getRaftPeers());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.hadoop.hdds.scm.ha;

import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;

import java.io.IOException;
Expand All @@ -40,9 +39,15 @@ SCMRatisResponse submitRequest(SCMRatisRequest request)

void stop() throws IOException;

RaftServer getServer();
RaftServer.Division getDivision();

RaftGroupId getRaftGroupId();
/**
* Returns roles of ratis peers.
*/
List<String> getRatisRoles();

List<RaftPeer> getRaftPeers();
/**
* Returns NotLeaderException with useful info.
*/
NotLeaderException triggerNotLeaderException();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand All @@ -42,6 +41,7 @@
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,12 +53,8 @@ public class SCMRatisServerImpl implements SCMRatisServer {
private static final Logger LOG =
LoggerFactory.getLogger(SCMRatisServerImpl.class);

private final RaftServer.Division division;
private final InetSocketAddress address;
private final RaftServer server;
private final RaftGroupId raftGroupId;
private final RaftGroup raftGroup;
private final RaftPeerId raftPeerId;
private final SCMStateMachine scmStateMachine;
private final ClientId clientId = ClientId.randomId();
private final AtomicLong callId = new AtomicLong();

Expand All @@ -69,41 +65,49 @@ public class SCMRatisServerImpl implements SCMRatisServer {
throws IOException {
this.address = haConf.getRatisBindAddress();

SCMHAGroupBuilder scmHAGroupBuilder = new SCMHAGroupBuilder(haConf, conf);
this.raftPeerId = scmHAGroupBuilder.getPeerId();
this.raftGroupId = scmHAGroupBuilder.getRaftGroupId();
this.raftGroup = scmHAGroupBuilder.getRaftGroup();
SCMHAGroupBuilder haGrpBuilder = new SCMHAGroupBuilder(haConf, conf);

final RaftProperties serverProperties = RatisUtil
.newRaftProperties(haConf, conf);
this.scmStateMachine = new SCMStateMachine();
this.server = RaftServer.newBuilder()
.setServerId(raftPeerId)
.setGroup(raftGroup)

RaftServer server = RaftServer.newBuilder()
.setServerId(haGrpBuilder.getPeerId())
.setGroup(haGrpBuilder.getRaftGroup())
.setProperties(serverProperties)
.setStateMachine(scmStateMachine)
.setStateMachine(new SCMStateMachine())
.build();

this.division = server.getDivision(haGrpBuilder.getRaftGroupId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to guarantee division is not null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You won't get a null, instead, the call will throw a GroupMismatchException in this situation.

}

@Override
public void start() throws IOException {
server.start();
division.getRaftServer().start();
}

@Override
public void registerStateMachineHandler(final RequestType handlerType,
final Object handler) {
scmStateMachine.registerHandler(handlerType, handler);
((SCMStateMachine) division.getStateMachine())
.registerHandler(handlerType, handler);
}

@Override
public SCMRatisResponse submitRequest(SCMRatisRequest request)
throws IOException, ExecutionException, InterruptedException {
final RaftClientRequest raftClientRequest = new RaftClientRequest(
clientId, server.getId(), raftGroupId, nextCallId(), request.encode(),
RaftClientRequest.writeRequestType(), null);
final RaftClientRequest raftClientRequest =
new RaftClientRequest(
clientId,
division.getId(),
division.getGroup().getGroupId(),
nextCallId(),
request.encode(),
RaftClientRequest.writeRequestType(),
null);
final RaftClientReply raftClientReply =
server.submitClientRequestAsync(raftClientRequest).get();
division.getRaftServer()
.submitClientRequestAsync(raftClientRequest)
.get();
return SCMRatisResponse.decode(raftClientReply);
}

Expand All @@ -113,26 +117,30 @@ private long nextCallId() {

@Override
public void stop() throws IOException {
server.close();
division.getRaftServer().close();
}

@Override
public RaftServer getServer() {
return server;
public RaftServer.Division getDivision() {
return division;
}

@Override
public RaftGroupId getRaftGroupId() {
return raftGroupId;
public List<String> getRatisRoles() {
return division.getGroup().getPeers().stream()
.map(peer -> peer.getAddress() == null ? "" : peer.getAddress())
.collect(Collectors.toList());
}

/**
* {@inheritDoc}
*/
@Override
public List<RaftPeer> getRaftPeers() {
return Collections.singletonList(RaftPeer.newBuilder()
.setId(raftPeerId).build());
public NotLeaderException triggerNotLeaderException() {
return new NotLeaderException(
division.getMemberId(), null, division.getGroup().getPeers());
}


/**
* If the SCM group starts from {@link ScmConfigKeys#OZONE_SCM_NAMES},
* its raft peers should locate on different nodes, and use the same port
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,7 @@ public void onMessage(SCMSafeModeManager.SafeModeStatus status,
startPipelineCreator();
}
} catch (NotLeaderException ex) {
LOG.warn("Not the current leader SCM and cannot process pipeline" +
" creation. Suggested leader is: ",
scmhaManager.getSuggestedLeader().getAddress());
LOG.warn("Not leader SCM, cannot process pipeline creation.");
}

}
Expand Down
Loading