Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ private CompletableFuture<RaftClientReply> sendRequestAsync(
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync ReadOnly {}", message);
}
return getClient().sendReadOnlyAsync(message);
return getClient().async().sendReadOnly(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("sendCommandAsync {}", message);
}
return getClient().sendAsync(message);
return getClient().async().send(message);
}

}
Expand Down Expand Up @@ -258,17 +258,17 @@ public XceiverClientReply watchForCommit(long index)
}
RaftClientReply reply;
try {
CompletableFuture<RaftClientReply> replyFuture = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
CompletableFuture<RaftClientReply> replyFuture = getClient().async()
.watch(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
replyFuture.get();
} catch (Exception e) {
Throwable t = HddsClientUtils.checkForException(e);
LOG.warn("3 way commit failed on pipeline {}", pipeline, e);
if (t instanceof GroupMismatchException) {
throw e;
}
reply = getClient()
.sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
reply = getClient().async()
.watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
.get();
List<RaftProtos.CommitInfoProto> commitInfoProtoList =
reply.getCommitInfos().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,18 @@ public static RaftPeerId toRaftPeerId(DatanodeDetails id) {
}

public static RaftPeer toRaftPeer(DatanodeDetails id) {
return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id));
return RaftPeer.newBuilder()
.setId(toRaftPeerId(id))
.setAddress(toRaftPeerAddressString(id))
.build();
}

public static RaftPeer toRaftPeer(DatanodeDetails id, int priority) {
return new RaftPeer(
toRaftPeerId(id), toRaftPeerAddressString(id), priority);
return RaftPeer.newBuilder()
.setId(toRaftPeerId(id))
.setAddress(toRaftPeerAddressString(id))
.setPriority(priority)
.build();
}

private static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer,
final RaftPeer peer = RatisHelper.toRaftPeer(d);
try (RaftClient client = RatisHelper.newRaftClient(peer, conf,
ozoneContainer.getTlsClientConfig())) {
client.groupAdd(group, peer.getId());
client.getGroupManagementApi(peer.getId()).add(group);
} catch (AlreadyExistsException ae) {
// do not log
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ private synchronized void updateLastApplied() {
* @param index index of the log entry
*/
@Override
public void notifyIndexUpdate(long term, long index) {
public void notifyTermIndexUpdated(long term, long index) {
applyTransactionCompletionMap.put(index, term);
// We need to call updateLastApplied here because now in ratis when a
// node becomes leader, it is checking stateMachineIndex >=
Expand Down Expand Up @@ -844,7 +844,7 @@ public void evictStateMachineCache() {
}

@Override
public void notifySlowness(RoleInfoProto roleInfoProto) {
public void notifyFollowerSlowness(RoleInfoProto roleInfoProto) {
ratisServer.handleNodeSlowness(gid, roleInfoProto);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.GroupManagementApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroup;
Expand Down Expand Up @@ -64,15 +65,19 @@ public class TestCreatePipelineCommandHandler {
private StateContext stateContext;
private SCMConnectionManager connectionManager;
private RaftClient raftClient;
private GroupManagementApi raftClientGroupManager;

@Before
public void setup() throws Exception {
ozoneContainer = Mockito.mock(OzoneContainer.class);
stateContext = Mockito.mock(StateContext.class);
connectionManager = Mockito.mock(SCMConnectionManager.class);
raftClient = Mockito.mock(RaftClient.class);
raftClientGroupManager = Mockito.mock(GroupManagementApi.class);
final RaftClient.Builder builder = mockRaftClientBuilder();
Mockito.when(builder.build()).thenReturn(raftClient);
Mockito.when(raftClient.getGroupManagementApi(
Mockito.any(RaftPeerId.class))).thenReturn(raftClientGroupManager);
PowerMockito.mockStatic(RaftClient.class);
PowerMockito.when(RaftClient.newBuilder()).thenReturn(builder);
}
Expand Down Expand Up @@ -121,8 +126,8 @@ public void testPipelineCreation() throws IOException {
Mockito.verify(writeChanel, Mockito.times(1))
.addGroup(pipelineID.getProtobuf(), datanodes, priorityList);

Mockito.verify(raftClient, Mockito.times(2))
.groupAdd(Mockito.any(RaftGroup.class), Mockito.any(RaftPeerId.class));
Mockito.verify(raftClientGroupManager, Mockito.times(2))
.add(Mockito.any(RaftGroup.class));
}

@Test
Expand Down Expand Up @@ -150,8 +155,8 @@ public void testCommandIdempotency() throws IOException {
Mockito.verify(writeChanel, Mockito.times(0))
.addGroup(pipelineID.getProtobuf(), datanodes);

Mockito.verify(raftClient, Mockito.times(0))
.groupAdd(Mockito.any(RaftGroup.class), Mockito.any(RaftPeerId.class));
Mockito.verify(raftClientGroupManager, Mockito.times(0))
.add(Mockito.any(RaftGroup.class));
}

private List<DatanodeDetails> getDatanodes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void setNoLeaderTimeout(Duration duration) {
this.noLeaderTimeout = duration.toMillis();
}

@Config(key = "rpcslowness.timeout",
@Config(key = "rpc.slowness.timeout",
defaultValue = "300s",
type = ConfigType.TIME,
tags = {OZONE, DATANODE, RATIS},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
try(RaftClient client = RatisHelper
.newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
retryPolicy, grpcTlsConfig, ozoneConf)) {
client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
true, false, p.getId());
client.getGroupManagementApi(p.getId())
.remove(RaftGroupId.valueOf(pipelineID.getId()), true, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ static void initXceiverServerRatis(
final OzoneConfiguration conf = new OzoneConfiguration();
final RaftClient client =
newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf), conf);
client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId());
client.getGroupManagementApi(p.getId())
.add(RatisHelper.newRaftGroup(pipeline));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,10 @@ public static OzoneManagerRatisServer newOMRatisServer(
InetSocketAddress ratisAddr = new InetSocketAddress(
omNodeDetails.getInetAddress(), omNodeDetails.getRatisPort());

RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr);
RaftPeer localRaftPeer = RaftPeer.newBuilder()
.setId(localRaftPeerId)
.setAddress(ratisAddr)
.build();

List<RaftPeer> raftPeers = new ArrayList<>();
// Add this Ratis server to the Ratis ring
Expand All @@ -322,11 +325,17 @@ public static OzoneManagerRatisServer newOMRatisServer(
RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
RaftPeer raftPeer;
if (peerInfo.isHostUnresolved()) {
raftPeer = new RaftPeer(raftPeerId, peerInfo.getRatisHostPortStr());
raftPeer = RaftPeer.newBuilder()
.setId(raftPeerId)
.setAddress(peerInfo.getRatisHostPortStr())
.build();
} else {
InetSocketAddress peerRatisAddr = new InetSocketAddress(
peerInfo.getInetAddress(), peerInfo.getRatisPort());
raftPeer = new RaftPeer(raftPeerId, peerRatisAddr);
raftPeer = RaftPeer.newBuilder()
.setId(raftPeerId)
.setAddress(peerRatisAddr)
.build();
}

// Add other OM nodes belonging to the same OM service to the Ratis ring
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public SnapshotInfo getLatestSnapshot() {
* @param index index which is being updated
*/
@Override
public void notifyIndexUpdate(long currentTerm, long index) {
public void notifyTermIndexUpdated(long currentTerm, long index) {
// SnapshotInfo should be updated when the term changes.
// The index here refers to the log entry index and the index in
// SnapshotInfo represents the snapshotIndex i.e. the index of the last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void setup() throws Exception {
Mockito.mock(OMRatisSnapshotInfo.class));
ozoneManagerStateMachine =
new OzoneManagerStateMachine(ozoneManagerRatisServer, false);
ozoneManagerStateMachine.notifyIndexUpdate(0, 0);
ozoneManagerStateMachine.notifyTermIndexUpdated(0, 0);
}

@Test
Expand All @@ -70,7 +70,7 @@ public void testLastAppliedIndex() {
// Happy scenario.

// Conf/metadata transaction.
ozoneManagerStateMachine.notifyIndexUpdate(0, 1);
ozoneManagerStateMachine.notifyTermIndexUpdated(0, 1);
Assert.assertEquals(0,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
Assert.assertEquals(1,
Expand All @@ -94,7 +94,7 @@ public void testLastAppliedIndex() {
ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex());

// Conf/metadata transaction.
ozoneManagerStateMachine.notifyIndexUpdate(0L, 4L);
ozoneManagerStateMachine.notifyTermIndexUpdated(0L, 4L);

Assert.assertEquals(0L,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
Expand Down Expand Up @@ -128,7 +128,7 @@ public void testApplyTransactionsUpdateLastAppliedIndexCalledLate() {
// lastAppliedIndex as 4 or not.

// Conf/metadata transaction.
ozoneManagerStateMachine.notifyIndexUpdate(0, 1);
ozoneManagerStateMachine.notifyTermIndexUpdated(0, 1);
Assert.assertEquals(0,
ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm());
Assert.assertEquals(1,
Expand All @@ -143,7 +143,7 @@ public void testApplyTransactionsUpdateLastAppliedIndexCalledLate() {


// Conf/metadata transaction.
ozoneManagerStateMachine.notifyIndexUpdate(0L, 5L);
ozoneManagerStateMachine.notifyTermIndexUpdated(0L, 5L);

// Still it should be zero, as for 2,3,4 updateLastAppliedIndex is not yet
// called so the lastAppliedIndex will be at older value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,19 @@ private void configureGroup() throws IOException {
RaftPeerId.getRaftPeerId(serverId);

RaftGroup group = RaftGroup.valueOf(groupId,
new RaftPeer(RaftPeerId.valueOf(serverId), serverAddress),
new RaftPeer(RaftPeerId.valueOf(FAKE_LEADER_ID),
FAKE_LEADER_ADDDRESS));
RaftPeer.newBuilder().setId(serverId).setAddress(serverAddress).build(),
RaftPeer.newBuilder()
.setId(RaftPeerId.valueOf(FAKE_LEADER_ID))
.setAddress(FAKE_LEADER_ADDDRESS)
.build());
RaftClient client = RaftClient.newBuilder()
.setClientId(clientId)
.setProperties(new RaftProperties(true))
.setRaftGroup(group)
.build();

RaftClientReply raftClientReply = client.groupAdd(group, peerId);
RaftClientReply raftClientReply = client.getGroupManagementApi(peerId)
.add(group);

LOG.info(
"Group is configured in the RAFT server (one follower, one fake "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,26 @@ private void configureGroup() throws IOException {
RaftPeerId.getRaftPeerId(serverId);

RaftGroup group = RaftGroup.valueOf(groupId,
new RaftPeer(RaftPeerId.valueOf(serverId), serverAddress),
new RaftPeer(RaftPeerId.valueOf(FAKE_FOLLOWER_ID1),
FAKE_LEADER_ADDDRESS1),
new RaftPeer(RaftPeerId.valueOf(FAKE_FOLLOWER_ID1),
FAKE_LEADER_ADDDRESS2));
RaftPeer.newBuilder()
.setId(serverId)
.setAddress(serverAddress)
.build(),
RaftPeer.newBuilder()
.setId(RaftPeerId.valueOf(FAKE_FOLLOWER_ID1))
.setAddress(FAKE_LEADER_ADDDRESS1)
.build(),
RaftPeer.newBuilder()
.setId(RaftPeerId.valueOf(FAKE_FOLLOWER_ID1))
.setAddress(FAKE_LEADER_ADDDRESS2)
.build());
RaftClient client = RaftClient.newBuilder()
.setClientId(clientId)
.setProperties(new RaftProperties(true))
.setRaftGroup(group)
.build();

RaftClientReply raftClientReply = client.groupAdd(group, peerId);
RaftClientReply raftClientReply = client.getGroupManagementApi(peerId)
.add(group);

LOG.info(
"Group is configured in the RAFT server (with two fake leader leader)"
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<declared.ozone.version>${ozone.version}</declared.ozone.version>

<!-- Apache Ratis version -->
<ratis.version>1.1.0-11689cd-SNAPSHOT</ratis.version>
<ratis.version>1.1.0-913f5a4-SNAPSHOT</ratis.version>

<!-- Apache Ratis thirdparty version -->
<ratis.thirdparty.version>0.6.0-SNAPSHOT</ratis.thirdparty.version>
Expand Down