diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java index 2c4b98fa2e0..08dd307c1b2 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcClient.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos; import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto; import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; @@ -52,13 +51,10 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{ private final InterSCMProtocolServiceGrpc.InterSCMProtocolServiceStub client; - private final long timeout; - - public InterSCMGrpcClient(final String host, final ConfigurationSource conf) { - Preconditions.checkNotNull(conf); - int port = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY, - ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT); - timeout = + public InterSCMGrpcClient(final String host, final int leaderPort, + final ConfigurationSource conf) { + final int port = leaderPort; + final long timeout = conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval(); NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port).usePlaintext() @@ -95,7 +91,7 @@ public void shutdown() { } @Override - public void close() throws Exception { + public void close() { shutdown(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java index d92220ae53d..b6f08a5fcdf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/InterSCMGrpcProtocolService.java @@ -69,6 +69,7 @@ public void start() throws IOException { LOG.info("Ignore. already started."); return; } else { + LOG.info("Starting SCM Grpc Service at port {}", port); server.start(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index dca469ac9b7..00aff5ec8f7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -101,14 +101,19 @@ public void start() throws IOException { if (ratisServer.getDivision().getGroup().getPeers().isEmpty()) { // this is a bootstrapped node // It will first try to add itself to existing ring - boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf), + final SCMNodeDetails nodeDetails = + scm.getSCMHANodeDetails().getLocalNodeDetails(); + final boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf), new AddSCMRequest.Builder().setClusterId(scm.getClusterId()) .setScmId(scm.getScmId()) - .setRatisAddr(scm.getSCMHANodeDetails().getLocalNodeDetails() + .setRatisAddr(nodeDetails // TODO : Should we use IP instead of hostname?? .getRatisHostPortStr()).build(), scm.getSCMNodeId()); if (!success) { throw new IOException("Adding SCM to existing HA group failed"); + } else { + LOG.info("Successfully added SCM {} to group {}", + nodeDetails.getNodeId(), ratisServer.getDivision().getGroup()); } } else { LOG.info(" scm role is {} peers {}", @@ -356,6 +361,11 @@ public void setExitManagerForTesting(ExitManager exitManagerForTesting) { this.exitManager = exitManagerForTesting; } + @VisibleForTesting + public void stopGrpcService() { + grpcServer.stop(); + } + @VisibleForTesting public static Logger getLogger() { return LOG; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java index 47b0a233592..17901ecfde5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java @@ -210,4 +210,8 @@ public InetSocketAddress getDatanodeProtocolServerAddress() { public String getDatanodeAddressKey() { return datanodeAddressKey; } + + public int getGrpcPort() { + return grpcPort; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java index 7d5d3eb58c1..7713a8cce52 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotDownloader.java @@ -39,5 +39,5 @@ public interface SCMSnapshotDownloader { */ CompletableFuture download(Path destination) throws IOException; - void close() throws Exception; + void close(); } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java index 093b810909d..e5bdfbe3884 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMSnapshotProvider.java @@ -55,8 +55,6 @@ public class SCMSnapshotProvider { private final ConfigurationSource conf; - private SCMSnapshotDownloader client; - private Map peerNodesMap; public SCMSnapshotProvider(ConfigurationSource conf, @@ -81,13 +79,13 @@ public SCMSnapshotProvider(ConfigurationSource conf, this.peerNodesMap.put(peerNode.getNodeId(), peerNode); } } - this.client = null; } @VisibleForTesting public void setPeerNodesMap(Map peerNodesMap) { this.peerNodesMap = peerNodesMap; } + /** * Download the latest checkpoint from SCM Leader . * @param leaderSCMNodeID leader SCM Node ID. @@ -103,18 +101,19 @@ public DBCheckpoint getSCMDBSnapshot(String leaderSCMNodeID) .getAbsolutePath(); File targetFile = new File(snapshotFilePath + ".tar.gz"); - // the client instance will be initialized only when first install snapshot - // notification from ratis leader will be received. - if (client == null) { - client = new InterSCMGrpcClient( - peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(), - conf); - } + // the downloadClient instance will be created as and when install snapshot + // request is received. No caching of the client as it should be a very rare + int port = peerNodesMap.get(leaderSCMNodeID).getGrpcPort(); + SCMSnapshotDownloader downloadClient = new InterSCMGrpcClient( + peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(), + port, conf); try { - client.download(targetFile.toPath()).get(); - } catch (InterruptedException | ExecutionException e) { + downloadClient.download(targetFile.toPath()).get(); + } catch (ExecutionException | InterruptedException e) { LOG.error("Rocks DB checkpoint downloading failed", e); throw new IOException(e); + } finally { + downloadClient.close(); } @@ -136,9 +135,4 @@ public File getScmSnapshotDir() { return scmSnapshotDir; } - public void stop() throws Exception { - if (client != null) { - client.close(); - } - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index 29dcf7583b7..bef743b5ccb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -22,6 +22,7 @@ import java.util.EnumMap; import java.util.Map; import java.util.Collection; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -51,6 +52,7 @@ import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -184,11 +186,20 @@ public void notifyNotLeader(Collection pendingEntries) { @Override public CompletableFuture notifyInstallSnapshotFromLeader( RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) { - - String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getFollowerInfo() - .getLeaderInfo().getId().getId()).toString(); + if (!roleInfoProto.getFollowerInfo().hasLeaderInfo()) { + return JavaUtils.completeExceptionally(new IOException("Failed to " + + "notifyInstallSnapshotFromLeader due to missing leader info")); + } + String leaderAddress = roleInfoProto.getFollowerInfo() + .getLeaderInfo().getId().getAddress(); + Optional leaderDetails = + scm.getSCMHANodeDetails().getPeerNodeDetails().stream().filter( + p -> p.getRatisHostPortStr().equals(leaderAddress)) + .findFirst(); + Preconditions.checkState(leaderDetails.isPresent()); + final String leaderNodeId = leaderDetails.get().getNodeId(); LOG.info("Received install snapshot notification from SCM leader: {} with " - + "term index: {}", leaderNodeId, firstTermIndexInLog); + + "term index: {}", leaderAddress, firstTermIndexInLog); CompletableFuture future = CompletableFuture.supplyAsync( () -> scm.getScmHAManager().installSnapshotFromLeader(leaderNodeId), diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java index f3682edca3a..288365e9d9e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMInstallSnapshot.java @@ -100,7 +100,9 @@ private DBCheckpoint downloadSnapshot() throws Exception { RATIS, ONE, "Owner2").getPipelineID()); pipelineManager.openPipeline(ratisPipeline2.getId()); SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder() - .setRpcAddress(new InetSocketAddress("0.0.0.0", 0)).setSCMNodeId("scm1") + .setRpcAddress(new InetSocketAddress("0.0.0.0", 0)) + .setGrpcPort(ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT) + .setSCMNodeId("scm1") .build(); Map peerMap = new HashMap<>(); peerMap.put(scmNodeDetails.getNodeId(), scmNodeDetails); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java index e2513244571..008bc108792 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMInstallSnapshotWithHA.java @@ -50,7 +50,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.Disabled; import org.slf4j.Logger; import org.slf4j.event.Level; @@ -70,7 +69,7 @@ public class TestSCMInstallSnapshotWithHA { private int numOfSCMs = 3; private static final long SNAPSHOT_THRESHOLD = 5; - // private static final int LOG_PURGE_GAP = 5; + private static final int LOG_PURGE_GAP = 5; /** * Create a MiniOzoneCluster for testing. @@ -86,8 +85,8 @@ public void init() throws Exception { scmServiceId = "scm-service-test1"; SCMHAConfiguration scmhaConfiguration = conf.getObject(SCMHAConfiguration.class); - // scmhaConfiguration.setRaftLogPurgeEnabled(true); - // scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP); + scmhaConfiguration.setRaftLogPurgeEnabled(true); + scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP); scmhaConfiguration.setRatisSnapshotThreshold(SNAPSHOT_THRESHOLD); conf.setFromObject(scmhaConfiguration); @@ -113,22 +112,10 @@ public void shutdown() { } } - /** - * This test is disabled for now as there seems to be an issue with - * Ratis install Snapshot code. In ratis while a new node gets added, - * unless and until the node gets added to the voter list, the follower state - * is not updated with leader info. So, while an install snapshot notification - * is received in the leader, the leader info is not set and hence, out of - * ratis transfer using the same leader info doesn't work. - * - * TODO: Fix this - * */ @Test - @Disabled public void testInstallSnapshot() throws Exception { // Get the leader SCM StorageContainerManager leaderSCM = getLeader(cluster); - String leaderNodeId = leaderSCM.getScmNodeDetails().getNodeId(); Assert.assertNotNull(leaderSCM); // Find the inactive SCM String followerId = getInactiveSCM(cluster).getScmId(); @@ -137,20 +124,9 @@ public void testInstallSnapshot() throws Exception { // Do some transactions so that the log index increases List containers = writeToIncreaseLogIndex(leaderSCM, 200); - // Get the latest db checkpoint from the leader SCM. - TransactionInfo transactionInfo = - leaderSCM.getScmHAManager().asSCMHADBTransactionBuffer() - .getLatestTrxInfo(); - TermIndex leaderTermIndex = - TermIndex.valueOf(transactionInfo.getTerm(), - transactionInfo.getTransactionIndex()); - long leaderSnaphsotIndex = leaderTermIndex.getIndex(); - long leaderSnapshotTermIndex = leaderTermIndex.getTerm(); - - DBCheckpoint leaderDbCheckpoint = - leaderSCM.getScmMetadataStore().getStore().getCheckpoint(false); - - // Start the inactive + // Start the inactive SCM. Install Snapshot will happen as part + // of setConfiguration() call to ratis leader and the follower will catch + // up cluster.startInactiveSCM(followerId); // The recently started should be lagging behind the leader . @@ -158,23 +134,7 @@ public void testInstallSnapshot() throws Exception { follower.getScmHAManager().getRatisServer().getSCMStateMachine() .getLastAppliedTermIndex().getIndex(); assertTrue( - followerLastAppliedIndex < leaderSnaphsotIndex); - - SCMHAManagerImpl scmhaManager = - (SCMHAManagerImpl) (follower.getScmHAManager()); - // Install leader 's db checkpoint on the lagging . - scmhaManager.installCheckpoint(leaderNodeId, leaderDbCheckpoint); - - SCMStateMachine followerStateMachine = - follower.getScmHAManager().getRatisServer().getSCMStateMachine(); - // After the new checkpoint is installed, the follower - // lastAppliedIndex must >= the snapshot index of the checkpoint. It - // could be great than snapshot index if there is any conf entry from ratis. - followerLastAppliedIndex = followerStateMachine - .getLastAppliedTermIndex().getIndex(); - assertTrue(followerLastAppliedIndex >= leaderSnaphsotIndex); - assertTrue(followerStateMachine - .getLastAppliedTermIndex().getTerm() >= leaderSnapshotTermIndex); + followerLastAppliedIndex >= 200); // Verify that the follower 's DB contains the transactions which were // made while it was inactive. @@ -317,7 +277,7 @@ private List writeToIncreaseLogIndex( scm.getScmHAManager().getRatisServer().getSCMStateMachine(); long logIndex = scm.getScmHAManager().getRatisServer().getSCMStateMachine() .getLastAppliedTermIndex().getIndex(); - while (logIndex < targetLogIndex) { + while (logIndex <= targetLogIndex) { containers.add(scm.getContainerManager() .allocateContainer(HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE, diff --git a/pom.xml b/pom.xml index 4d13c35675b..2af3f605b01 100644 --- a/pom.xml +++ b/pom.xml @@ -79,10 +79,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs ${ozone.version} - 2.0.0 + 2.1.0-43915d2-SNAPSHOT - 0.6.0 + 0.7.0-a398b19-SNAPSHOT apache.snapshots.https Apache Development Snapshot Repository @@ -183,7 +183,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs 1.33.0 1.5.0.Final - 4.1.51.Final + 4.1.63.Final 1.8