diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index ced9df7fb664..6e99bf3553d4 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -217,12 +217,12 @@ private CompletableFuture sendRequestAsync( if (LOG.isDebugEnabled()) { LOG.debug("sendCommandAsync ReadOnly {}", message); } - return getClient().sendReadOnlyAsync(message); + return getClient().async().sendReadOnly(message); } else { if (LOG.isDebugEnabled()) { LOG.debug("sendCommandAsync {}", message); } - return getClient().sendAsync(message); + return getClient().async().send(message); } } @@ -258,8 +258,8 @@ public XceiverClientReply watchForCommit(long index) } RaftClientReply reply; try { - CompletableFuture replyFuture = getClient() - .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); + CompletableFuture replyFuture = getClient().async() + .watch(index, RaftProtos.ReplicationLevel.ALL_COMMITTED); replyFuture.get(); } catch (Exception e) { Throwable t = HddsClientUtils.checkForException(e); @@ -267,8 +267,8 @@ public XceiverClientReply watchForCommit(long index) if (t instanceof GroupMismatchException) { throw e; } - reply = getClient() - .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) + reply = getClient().async() + .watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED) .get(); List commitInfoProtoList = reply.getCommitInfos().stream() diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index 324774d7d77f..c910dd5acea8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -104,12 +104,18 @@ public static RaftPeerId toRaftPeerId(DatanodeDetails id) { } public static RaftPeer toRaftPeer(DatanodeDetails id) { - return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id)); + return RaftPeer.newBuilder() + .setId(toRaftPeerId(id)) + .setAddress(toRaftPeerAddressString(id)) + .build(); } public static RaftPeer toRaftPeer(DatanodeDetails id, int priority) { - return new RaftPeer( - toRaftPeerId(id), toRaftPeerAddressString(id), priority); + return RaftPeer.newBuilder() + .setId(toRaftPeerId(id)) + .setAddress(toRaftPeerAddressString(id)) + .setPriority(priority) + .build(); } private static List toRaftPeers(Pipeline pipeline) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java index 4ad05de2cd48..db4bd76cc25f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java @@ -96,7 +96,7 @@ public void handle(SCMCommand command, OzoneContainer ozoneContainer, final RaftPeer peer = RatisHelper.toRaftPeer(d); try (RaftClient client = RatisHelper.newRaftClient(peer, conf, ozoneContainer.getTlsClientConfig())) { - client.groupAdd(group, peer.getId()); + client.getGroupManagementApi(peer.getId()).add(group); } catch (AlreadyExistsException ae) { // do not log } catch (IOException ioe) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 89ab976bc88e..1a87ce55e26d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -700,7 +700,7 @@ private synchronized void updateLastApplied() { * @param index index of the log entry */ @Override - public void notifyIndexUpdate(long term, long index) { + public void notifyTermIndexUpdated(long term, long index) { applyTransactionCompletionMap.put(index, term); // We need to call updateLastApplied here because now in ratis when a // node becomes leader, it is checking stateMachineIndex >= @@ -844,7 +844,7 @@ public void evictStateMachineCache() { } @Override - public void notifySlowness(RoleInfoProto roleInfoProto) { + public void notifyFollowerSlowness(RoleInfoProto roleInfoProto) { ratisServer.handleNodeSlowness(gid, roleInfoProto); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java index ede0b94de476..febd1c3bd0df 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.api.GroupManagementApi; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftGroup; @@ -64,6 +65,7 @@ public class TestCreatePipelineCommandHandler { private StateContext stateContext; private SCMConnectionManager connectionManager; private RaftClient raftClient; + private GroupManagementApi raftClientGroupManager; @Before public void setup() throws Exception { @@ -71,8 +73,11 @@ public void setup() throws Exception { stateContext = Mockito.mock(StateContext.class); connectionManager = Mockito.mock(SCMConnectionManager.class); raftClient = Mockito.mock(RaftClient.class); + raftClientGroupManager = Mockito.mock(GroupManagementApi.class); final RaftClient.Builder builder = mockRaftClientBuilder(); Mockito.when(builder.build()).thenReturn(raftClient); + Mockito.when(raftClient.getGroupManagementApi( + Mockito.any(RaftPeerId.class))).thenReturn(raftClientGroupManager); PowerMockito.mockStatic(RaftClient.class); PowerMockito.when(RaftClient.newBuilder()).thenReturn(builder); } @@ -121,8 +126,8 @@ public void testPipelineCreation() throws IOException { Mockito.verify(writeChanel, Mockito.times(1)) .addGroup(pipelineID.getProtobuf(), datanodes, priorityList); - Mockito.verify(raftClient, Mockito.times(2)) - .groupAdd(Mockito.any(RaftGroup.class), Mockito.any(RaftPeerId.class)); + Mockito.verify(raftClientGroupManager, Mockito.times(2)) + .add(Mockito.any(RaftGroup.class)); } @Test @@ -150,8 +155,8 @@ public void testCommandIdempotency() throws IOException { Mockito.verify(writeChanel, Mockito.times(0)) .addGroup(pipelineID.getProtobuf(), datanodes); - Mockito.verify(raftClient, Mockito.times(0)) - .groupAdd(Mockito.any(RaftGroup.class), Mockito.any(RaftPeerId.class)); + Mockito.verify(raftClientGroupManager, Mockito.times(0)) + .add(Mockito.any(RaftGroup.class)); } private List getDatanodes() { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java index 19084f179cb8..8392789735f1 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java @@ -88,7 +88,7 @@ public void setNoLeaderTimeout(Duration duration) { this.noLeaderTimeout = duration.toMillis(); } - @Config(key = "rpcslowness.timeout", + @Config(key = "rpc.slowness.timeout", defaultValue = "300s", type = ConfigType.TIME, tags = {OZONE, DATANODE, RATIS}, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index d174a89b6fe6..97bff9a6e504 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -94,8 +94,8 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID, try(RaftClient client = RatisHelper .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p, retryPolicy, grpcTlsConfig, ozoneConf)) { - client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()), - true, false, p.getId()); + client.getGroupManagementApi(p.getId()) + .remove(RaftGroupId.valueOf(pipelineID.getId()), true, false); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java index 535ca91b4903..668d694ea863 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java @@ -122,6 +122,7 @@ static void initXceiverServerRatis( final OzoneConfiguration conf = new OzoneConfiguration(); final RaftClient client = newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf), conf); - client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId()); + client.getGroupManagementApi(p.getId()) + .add(RatisHelper.newRaftGroup(pipeline)); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java index 0b5a2b124342..0bf58ba4d1f3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java @@ -311,7 +311,10 @@ public static OzoneManagerRatisServer newOMRatisServer( InetSocketAddress ratisAddr = new InetSocketAddress( omNodeDetails.getInetAddress(), omNodeDetails.getRatisPort()); - RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr); + RaftPeer localRaftPeer = RaftPeer.newBuilder() + .setId(localRaftPeerId) + .setAddress(ratisAddr) + .build(); List raftPeers = new ArrayList<>(); // Add this Ratis server to the Ratis ring @@ -322,11 +325,17 @@ public static OzoneManagerRatisServer newOMRatisServer( RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId); RaftPeer raftPeer; if (peerInfo.isHostUnresolved()) { - raftPeer = new RaftPeer(raftPeerId, peerInfo.getRatisHostPortStr()); + raftPeer = RaftPeer.newBuilder() + .setId(raftPeerId) + .setAddress(peerInfo.getRatisHostPortStr()) + .build(); } else { InetSocketAddress peerRatisAddr = new InetSocketAddress( peerInfo.getInetAddress(), peerInfo.getRatisPort()); - raftPeer = new RaftPeer(raftPeerId, peerRatisAddr); + raftPeer = RaftPeer.newBuilder() + .setId(raftPeerId) + .setAddress(peerRatisAddr) + .build(); } // Add other OM nodes belonging to the same OM service to the Ratis ring diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java index acd637536917..aaf94e9b8c56 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java @@ -149,7 +149,7 @@ public SnapshotInfo getLatestSnapshot() { * @param index index which is being updated */ @Override - public void notifyIndexUpdate(long currentTerm, long index) { + public void notifyTermIndexUpdated(long currentTerm, long index) { // SnapshotInfo should be updated when the term changes. // The index here refers to the log entry index and the index in // SnapshotInfo represents the snapshotIndex i.e. the index of the last diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java index 5a60f7cb6a4b..285c992ee5c4 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerStateMachine.java @@ -61,7 +61,7 @@ public void setup() throws Exception { Mockito.mock(OMRatisSnapshotInfo.class)); ozoneManagerStateMachine = new OzoneManagerStateMachine(ozoneManagerRatisServer, false); - ozoneManagerStateMachine.notifyIndexUpdate(0, 0); + ozoneManagerStateMachine.notifyTermIndexUpdated(0, 0); } @Test @@ -70,7 +70,7 @@ public void testLastAppliedIndex() { // Happy scenario. // Conf/metadata transaction. - ozoneManagerStateMachine.notifyIndexUpdate(0, 1); + ozoneManagerStateMachine.notifyTermIndexUpdated(0, 1); Assert.assertEquals(0, ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm()); Assert.assertEquals(1, @@ -94,7 +94,7 @@ public void testLastAppliedIndex() { ozoneManagerStateMachine.getLastAppliedTermIndex().getIndex()); // Conf/metadata transaction. - ozoneManagerStateMachine.notifyIndexUpdate(0L, 4L); + ozoneManagerStateMachine.notifyTermIndexUpdated(0L, 4L); Assert.assertEquals(0L, ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm()); @@ -128,7 +128,7 @@ public void testApplyTransactionsUpdateLastAppliedIndexCalledLate() { // lastAppliedIndex as 4 or not. // Conf/metadata transaction. - ozoneManagerStateMachine.notifyIndexUpdate(0, 1); + ozoneManagerStateMachine.notifyTermIndexUpdated(0, 1); Assert.assertEquals(0, ozoneManagerStateMachine.getLastAppliedTermIndex().getTerm()); Assert.assertEquals(1, @@ -143,7 +143,7 @@ public void testApplyTransactionsUpdateLastAppliedIndexCalledLate() { // Conf/metadata transaction. - ozoneManagerStateMachine.notifyIndexUpdate(0L, 5L); + ozoneManagerStateMachine.notifyTermIndexUpdated(0L, 5L); // Still it should be zero, as for 2,3,4 updateLastAppliedIndex is not yet // called so the lastAppliedIndex will be at older value. diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/FollowerAppendLogEntryGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/FollowerAppendLogEntryGenerator.java index c96c8a3da0f3..b6de21811185 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/FollowerAppendLogEntryGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/FollowerAppendLogEntryGenerator.java @@ -327,16 +327,19 @@ private void configureGroup() throws IOException { RaftPeerId.getRaftPeerId(serverId); RaftGroup group = RaftGroup.valueOf(groupId, - new RaftPeer(RaftPeerId.valueOf(serverId), serverAddress), - new RaftPeer(RaftPeerId.valueOf(FAKE_LEADER_ID), - FAKE_LEADER_ADDDRESS)); + RaftPeer.newBuilder().setId(serverId).setAddress(serverAddress).build(), + RaftPeer.newBuilder() + .setId(RaftPeerId.valueOf(FAKE_LEADER_ID)) + .setAddress(FAKE_LEADER_ADDDRESS) + .build()); RaftClient client = RaftClient.newBuilder() .setClientId(clientId) .setProperties(new RaftProperties(true)) .setRaftGroup(group) .build(); - RaftClientReply raftClientReply = client.groupAdd(group, peerId); + RaftClientReply raftClientReply = client.getGroupManagementApi(peerId) + .add(group); LOG.info( "Group is configured in the RAFT server (one follower, one fake " diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/LeaderAppendLogEntryGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/LeaderAppendLogEntryGenerator.java index 8f6575526c64..bf2cc044d99d 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/LeaderAppendLogEntryGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/LeaderAppendLogEntryGenerator.java @@ -251,18 +251,26 @@ private void configureGroup() throws IOException { RaftPeerId.getRaftPeerId(serverId); RaftGroup group = RaftGroup.valueOf(groupId, - new RaftPeer(RaftPeerId.valueOf(serverId), serverAddress), - new RaftPeer(RaftPeerId.valueOf(FAKE_FOLLOWER_ID1), - FAKE_LEADER_ADDDRESS1), - new RaftPeer(RaftPeerId.valueOf(FAKE_FOLLOWER_ID1), - FAKE_LEADER_ADDDRESS2)); + RaftPeer.newBuilder() + .setId(serverId) + .setAddress(serverAddress) + .build(), + RaftPeer.newBuilder() + .setId(RaftPeerId.valueOf(FAKE_FOLLOWER_ID1)) + .setAddress(FAKE_LEADER_ADDDRESS1) + .build(), + RaftPeer.newBuilder() + .setId(RaftPeerId.valueOf(FAKE_FOLLOWER_ID1)) + .setAddress(FAKE_LEADER_ADDDRESS2) + .build()); RaftClient client = RaftClient.newBuilder() .setClientId(clientId) .setProperties(new RaftProperties(true)) .setRaftGroup(group) .build(); - RaftClientReply raftClientReply = client.groupAdd(group, peerId); + RaftClientReply raftClientReply = client.getGroupManagementApi(peerId) + .add(group); LOG.info( "Group is configured in the RAFT server (with two fake leader leader)" diff --git a/pom.xml b/pom.xml index 38bbb1af630f..07ec99d586ca 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs ${ozone.version} - 1.1.0-11689cd-SNAPSHOT + 1.1.0-913f5a4-SNAPSHOT 0.6.0-SNAPSHOT