Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
Expand Down Expand Up @@ -220,8 +219,7 @@ public void initialize(
private long loadSnapshot(SingleFileSnapshotInfo snapshot)
throws IOException {
if (snapshot == null) {
TermIndex empty =
TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX);
TermIndex empty = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX);
LOG.info("{}: The snapshot info is null. Setting the last applied index" +
"to:{}", gid, empty);
setLastAppliedTermIndex(empty);
Expand Down Expand Up @@ -420,10 +418,9 @@ private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
final WriteChunkRequestProto write = requestProto.getWriteChunk();
RaftServer server = ratisServer.getServer();
Preconditions.checkState(server instanceof RaftServerProxy);
try {
if (((RaftServerProxy) server).getImpl(gid).isLeader()) {
RaftServer.Division division = ratisServer.getServerDivision();
if (division.getInfo().isLeader()) {
stateMachineDataCache.put(entryIndex, write.getData());
}
} catch (InterruptedException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
Expand Down Expand Up @@ -460,9 +459,9 @@ public void start() throws IOException {
}
server.start();

int realPort =
((RaftServerProxy) server).getServerRpc().getInetSocketAddress()
.getPort();
int realPort = server.getServerRpc()
.getInetSocketAddress()
.getPort();

if (port == 0) {
LOG.info("{} {} is started using port {}", getClass().getSimpleName(),
Expand Down Expand Up @@ -516,6 +515,15 @@ public RaftServer getServer() {
return server;
}

public RaftServer.Division getServerDivision() throws IOException {
return getServerDivision(server.getGroupIds().iterator().next());
}

public RaftServer.Division getServerDivision(RaftGroupId id)
throws IOException {
return server.getDivision(id);
}

private void processReply(RaftClientReply reply) throws IOException {
// NotLeader exception is thrown only when the raft server to which the
// request is submitted is not the leader. The request will be rejected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,12 @@
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.security.token.Token;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;
import org.junit.Assert;
import org.slf4j.Logger;
Expand Down Expand Up @@ -583,35 +580,33 @@ public static String getFixedLengthString(String string, int length) {
return String.format("%1$" + length + "s", string);
}

private static RaftServerImpl getRaftServerImpl(HddsDatanodeService dn,
Pipeline pipeline) throws Exception {
private static RaftServer.Division getRaftServerDivision(
HddsDatanodeService dn, Pipeline pipeline) throws Exception {
if (!pipeline.getNodes().contains(dn.getDatanodeDetails())) {
throw new IllegalArgumentException("Pipeline:" + pipeline.getId() +
" not exist in datanode:" + dn.getDatanodeDetails().getUuid());
}

XceiverServerSpi server = dn.getDatanodeStateMachine().
getContainer().getWriteChannel();
RaftServerProxy proxy =
(RaftServerProxy) (((XceiverServerRatis) server).getServer());
RaftGroupId groupId =
pipeline == null ? proxy.getGroupIds().iterator().next() :
RatisHelper.newRaftGroup(pipeline).getGroupId();
return proxy.getImpl(groupId);
XceiverServerRatis server =
(XceiverServerRatis) (dn.getDatanodeStateMachine().
getContainer().getWriteChannel());
return pipeline == null ? server.getServerDivision() :
server.getServerDivision(
RatisHelper.newRaftGroup(pipeline).getGroupId());
}

public static StateMachine getStateMachine(HddsDatanodeService dn,
Pipeline pipeline) throws Exception {
return getRaftServerImpl(dn, pipeline).getStateMachine();
return getRaftServerDivision(dn, pipeline).getStateMachine();
}

public static boolean isRatisLeader(HddsDatanodeService dn, Pipeline pipeline)
throws Exception {
return getRaftServerImpl(dn, pipeline).isLeader();
return getRaftServerDivision(dn, pipeline).getInfo().isLeader();
}

public static boolean isRatisFollower(HddsDatanodeService dn,
Pipeline pipeline) throws Exception {
return getRaftServerImpl(dn, pipeline).isFollower();
return getRaftServerDivision(dn, pipeline).getInfo().isFollower();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@

import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;
import org.junit.Assert;
import org.slf4j.Logger;
Expand Down Expand Up @@ -324,21 +322,19 @@ public static StateMachine getStateMachine(MiniOzoneCluster cluster)
return getStateMachine(cluster.getHddsDatanodes().get(0), null);
}

private static RaftServerImpl getRaftServerImpl(HddsDatanodeService dn,
Pipeline pipeline) throws Exception {
XceiverServerSpi server = dn.getDatanodeStateMachine().
getContainer().getWriteChannel();
RaftServerProxy proxy =
(RaftServerProxy) (((XceiverServerRatis) server).getServer());
RaftGroupId groupId =
pipeline == null ? proxy.getGroupIds().iterator().next() :
RatisHelper.newRaftGroup(pipeline).getGroupId();
return proxy.getImpl(groupId);
private static RaftServer.Division getRaftServerDivision(
HddsDatanodeService dn, Pipeline pipeline) throws Exception {
XceiverServerRatis server =
(XceiverServerRatis) (dn.getDatanodeStateMachine().
getContainer().getWriteChannel());
return pipeline == null ? server.getServerDivision() :
server.getServerDivision(
RatisHelper.newRaftGroup(pipeline).getGroupId());
}

public static StateMachine getStateMachine(HddsDatanodeService dn,
Pipeline pipeline) throws Exception {
return getRaftServerImpl(dn, pipeline).getStateMachine();
return getRaftServerDivision(dn, pipeline).getStateMachine();
}

public static HddsDatanodeService getDatanodeService(OmKeyLocationInfo info,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLog;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -53,7 +53,7 @@ public void setup() {
path = GenericTestUtils
.getTempPath(TestOzoneClientKeyGenerator.class.getSimpleName());
GenericTestUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
File baseDir = new File(path);
baseDir.mkdirs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLog;
import java.util.LinkedList;
import org.junit.Assert;
Expand Down Expand Up @@ -57,7 +57,7 @@ public void setup() {
path = GenericTestUtils
.getTempPath(TestOzoneClientKeyGenerator.class.getSimpleName());
GenericTestUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
File baseDir = new File(path);
baseDir.mkdirs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.hadoop.test.GenericTestUtils;

import org.apache.commons.io.FileUtils;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLog;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -48,7 +48,7 @@ public void setup() {
path = GenericTestUtils
.getTempPath(TestOzoneClientKeyGenerator.class.getSimpleName());
GenericTestUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(RaftServer.LOG, Level.DEBUG);
File baseDir = new File(path);
baseDir.mkdirs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void testInstallSnapshot() throws Exception {
OMTransactionInfo omTransactionInfo =
OMTransactionInfo.readTransactionInfo(leaderOM.getMetadataManager());
TermIndex leaderOMTermIndex =
TermIndex.newTermIndex(omTransactionInfo.getTerm(),
TermIndex.valueOf(omTransactionInfo.getTerm(),
omTransactionInfo.getTransactionIndex());
long leaderOMSnaphsotIndex = leaderOMTermIndex.getIndex();
long leaderOMSnapshotTermIndex = leaderOMTermIndex.getTerm();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public long getTransactionIndex() {
}

public TermIndex getTermIndex() {
return TermIndex.newTermIndex(term, transactionIndex);
return TermIndex.valueOf(term, transactionIndex);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3338,7 +3338,7 @@ TermIndex installCheckpoint(String leaderId, Path checkpointLocation,

// TODO: We should only return the snpashotIndex to the leader.
// Should be fixed after RATIS-586
TermIndex newTermIndex = TermIndex.newTermIndex(term, lastAppliedIndex);
TermIndex newTermIndex = TermIndex.valueOf(term, lastAppliedIndex);
return newTermIndex;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void updateTermIndex(long newTerm, long newIndex) {

@Override
public TermIndex getTermIndex() {
return TermIndex.newTermIndex(term, snapshotIndex);
return TermIndex.valueOf(term, snapshotIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
Expand Down Expand Up @@ -574,14 +572,12 @@ public enum RaftServerStatus {
* @return RaftServerStatus.
*/
public RaftServerStatus checkLeaderStatus() {
Preconditions.checkState(server instanceof RaftServerProxy);
RaftServerImpl serverImpl;
try {
serverImpl = ((RaftServerProxy) server).getImpl(raftGroupId);
if (serverImpl != null) {
if (!serverImpl.isLeader()) {
RaftServer.Division division = server.getDivision(raftGroupId);
if (division != null) {
if (!division.getInfo().isLeader()) {
return RaftServerStatus.NOT_LEADER;
} else if (serverImpl.isLeaderReady()) {
} else if (division.getInfo().isLeaderReady()) {
return RaftServerStatus.LEADER_AND_READY;
} else {
return RaftServerStatus.LEADER_AND_NOT_READY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void unpause(long newLastAppliedSnaphsotIndex,
getLifeCycle().startAndTransition(() -> {
this.ozoneManagerDoubleBuffer = buildDoubleBufferForRatis();
handler.updateDoubleBuffer(ozoneManagerDoubleBuffer);
this.setLastAppliedTermIndex(TermIndex.newTermIndex(
this.setLastAppliedTermIndex(TermIndex.valueOf(
newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
});
}
Expand Down Expand Up @@ -509,7 +509,7 @@ public void loadSnapshotInfoFromDB() throws IOException {
OMTransactionInfo.readTransactionInfo(
ozoneManager.getMetadataManager());
if (omTransactionInfo != null) {
setLastAppliedTermIndex(TermIndex.newTermIndex(
setLastAppliedTermIndex(TermIndex.valueOf(
omTransactionInfo.getTerm(),
omTransactionInfo.getTransactionIndex()));
snapshotInfo.updateTermIndex(omTransactionInfo.getTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void init() throws Exception {
folder.newFolder().getAbsolutePath());
omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
initialTermIndex = TermIndex.newTermIndex(0, 0);
initialTermIndex = TermIndex.valueOf(0, 0);
OMRatisSnapshotInfo omRatisSnapshotInfo = new OMRatisSnapshotInfo();
when(ozoneManager.getSnapshotInfo()).thenReturn(omRatisSnapshotInfo);
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(conf, ozoneManager,
Expand Down Expand Up @@ -136,7 +136,7 @@ public void testLoadSnapshotInfoOnStart() throws Exception {
SnapshotInfo snapshotInfo =
omRatisServer.getOmStateMachine().getLatestSnapshot();

TermIndex newSnapshotIndex = TermIndex.newTermIndex(
TermIndex newSnapshotIndex = TermIndex.valueOf(
snapshotInfo.getTerm(), snapshotInfo.getIndex() + 100);

omMetadataManager.getTransactionInfoTable().put(TRANSACTION_INFO_KEY,
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<declared.ozone.version>${ozone.version}</declared.ozone.version>

<!-- Apache Ratis version -->
<ratis.version>1.1.0-913f5a4-SNAPSHOT</ratis.version>
<ratis.version>1.1.0-c5eafb9-SNAPSHOT</ratis.version>

<!-- Apache Ratis thirdparty version -->
<ratis.thirdparty.version>0.6.0-SNAPSHOT</ratis.thirdparty.version>
Expand Down