diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java index 54de06efb98a..fc3c1548ba14 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -96,9 +96,7 @@ public SCMHAManagerImpl(final ConfigurationSource conf, this.transactionBuffer = new SCMHADBTransactionBufferImpl(scm); this.ratisServer = new SCMRatisServerImpl(conf, scm, (SCMHADBTransactionBuffer) transactionBuffer); - this.scmSnapshotProvider = new SCMSnapshotProvider(conf, - scm.getSCMHANodeDetails().getPeerNodeDetails(), - scm.getScmCertificateClient()); + this.scmSnapshotProvider = newScmSnapshotProvider(scm); grpcServer = new InterSCMGrpcProtocolService(conf, scm); } else { this.transactionBuffer = new SCMDBTransactionBufferImpl(); @@ -109,6 +107,13 @@ public SCMHAManagerImpl(final ConfigurationSource conf, } + @VisibleForTesting + protected SCMSnapshotProvider newScmSnapshotProvider(StorageContainerManager storageContainerManager) { + return new SCMSnapshotProvider(storageContainerManager.getConfiguration(), + storageContainerManager.getSCMHANodeDetails().getPeerNodeDetails(), + storageContainerManager.getScmCertificateClient()); + } + /** * {@inheritDoc} */ @@ -194,8 +199,10 @@ public DBCheckpoint downloadCheckpointFromLeader(String leaderId) { } DBCheckpoint dBCheckpoint = getDBCheckpointFromLeader(leaderId); - LOG.info("Downloaded checkpoint from Leader {} to the location {}", - leaderId, dBCheckpoint.getCheckpointLocation()); + if (dBCheckpoint != null) { + LOG.info("Downloaded checkpoint from Leader {} to the location {}", + leaderId, dBCheckpoint.getCheckpointLocation()); + } return dBCheckpoint; } @@ -262,7 +269,7 @@ private DBCheckpoint getDBCheckpointFromLeader(String leaderId) { try { return scmSnapshotProvider.getSCMDBSnapshot(leaderId); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Failed to download checkpoint from SCM leader {}", leaderId, e); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java index 7eab815446ca..1128accd2ff4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java @@ -233,8 +233,8 @@ public CompletableFuture notifyInstallSnapshotFromLeader( String leaderAddress = roleInfoProto.getFollowerInfo() .getLeaderInfo().getId().getAddress(); Optional leaderDetails = - scm.getSCMHANodeDetails().getPeerNodeDetails().stream().filter( - p -> p.getRatisHostPortStr().equals(leaderAddress)) + scm.getSCMHANodeDetails().getPeerNodeDetails().stream() + .filter(p -> p.getRatisHostPortStr().equals(leaderAddress)) .findFirst(); Preconditions.checkState(leaderDetails.isPresent()); final String leaderNodeId = leaderDetails.get().getNodeId(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java index f33eedf9695b..294663ba75cc 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMHAManagerImpl.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.ha; +import java.net.InetSocketAddress; import java.nio.file.Path; import java.time.Clock; import java.time.ZoneOffset; @@ -40,8 +41,10 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.BatchOperation; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.DBStore; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.server.DivisionInfo; @@ -55,9 +58,11 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeoutException; +import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -73,6 +78,8 @@ @TestMethodOrder(MethodOrderer.OrderAnnotation.class) class TestSCMHAManagerImpl { + private static final String LEADER_SCM_ID = "leader"; + private static final int LEADER_PORT = 9894; private static final String FOLLOWER_SCM_ID = "follower"; private Path storageBaseDir; @@ -85,18 +92,17 @@ void setup(@TempDir Path tempDir) throws IOException, InterruptedException, TimeoutException { storageBaseDir = tempDir; clusterID = UUID.randomUUID().toString(); - OzoneConfiguration conf = getConfig("scm1", 9894); - final StorageContainerManager scm = getMockStorageContainerManager(conf); - SCMRatisServerImpl.initialize(clusterID, scm.getScmId(), - scm.getScmNodeDetails(), conf); + final StorageContainerManager scm = getMockStorageContainerManager(LEADER_SCM_ID, LEADER_PORT); + SCMRatisServerImpl.initialize(clusterID, LEADER_SCM_ID, scm.getScmNodeDetails(), scm.getConfiguration()); primarySCMHAManager = scm.getScmHAManager(); primarySCMHAManager.start(); final DivisionInfo ratisDivision = primarySCMHAManager.getRatisServer() .getDivision().getInfo(); // Wait for Ratis Server to be ready waitForSCMToBeReady(ratisDivision); - follower = getMockStorageContainerManager(getConfig(FOLLOWER_SCM_ID, 9898)) - .getScmHAManager().getRatisServer(); + StorageContainerManager followerSCM = getMockStorageContainerManager(FOLLOWER_SCM_ID, 9898); + follower = followerSCM.getScmHAManager() + .getRatisServer(); } private OzoneConfiguration getConfig(String scmId, int ratisPort) { @@ -150,10 +156,16 @@ private int getPeerCount() { .getDivision().getGroup().getPeers().size(); } + private String getRaftServerAddress(SCMRatisServer ratisServer) { + return "localhost:" + ratisServer.getDivision() + .getRaftServer() + .getServerRpc() + .getInetSocketAddress() + .getPort(); + } + private String getFollowerAddress() { - return "localhost:" + - follower.getDivision() - .getRaftServer().getServerRpc().getInetSocketAddress().getPort(); + return getRaftServerAddress(follower); } @Test @@ -186,9 +198,8 @@ void testHARingRemovalErrors() throws IOException, } } - private StorageContainerManager getMockStorageContainerManager( - OzoneConfiguration conf) throws IOException { - final String scmID = UUID.randomUUID().toString(); + private StorageContainerManager getMockStorageContainerManager(String scmID, int port) throws IOException { + OzoneConfiguration conf = getConfig(scmID, port); final DBStore dbStore = mock(DBStore.class); final SCMContext scmContext = mock(SCMContext.class); @@ -213,6 +224,7 @@ private StorageContainerManager getMockStorageContainerManager( mock(SCMDatanodeProtocolServer.class); when(scm.getClusterId()).thenReturn(clusterID); + when(scm.getConfiguration()).thenReturn(conf); when(scm.getScmId()).thenReturn(scmID); when(scm.getScmMetadataStore()).thenReturn(metadataStore); when(scm.getScmNodeDetails()).thenReturn(nodeDetails); @@ -231,12 +243,33 @@ private StorageContainerManager getMockStorageContainerManager( when(scmHANodeDetails.getLocalNodeDetails()).thenReturn(nodeDetails); when(blockManager.getDeletedBlockLog()).thenReturn(deletedBlockLog); when(dbStore.initBatchOperation()).thenReturn(batchOperation); - when(nodeDetails.getRatisHostPortStr()).thenReturn("localhost:" + - conf.get(ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY)); + when(nodeDetails.getRatisHostPortStr()).thenReturn("localhost:" + port); when(scm.getSystemClock()).thenReturn(Clock.system(ZoneOffset.UTC)); + if (FOLLOWER_SCM_ID.equals(scmID)) { + final SCMNodeDetails leaderNodeDetails = mock(SCMNodeDetails.class); + final List peerNodeDetails = singletonList(leaderNodeDetails); + when(scmHANodeDetails.getPeerNodeDetails()).thenReturn(peerNodeDetails); + when(leaderNodeDetails.getNodeId()).thenReturn(LEADER_SCM_ID); + when(leaderNodeDetails.getGrpcPort()).thenReturn(LEADER_PORT); + when(leaderNodeDetails.getRatisHostPortStr()).thenReturn("localhost:" + LEADER_PORT); + InetSocketAddress rpcAddress = NetUtils.createSocketAddr("localhost", LEADER_PORT); + when(leaderNodeDetails.getRpcAddress()).thenReturn(rpcAddress); + when(leaderNodeDetails.getInetAddress()).thenReturn(rpcAddress.getAddress()); + } + + DBCheckpoint checkpoint = mock(DBCheckpoint.class); + SCMSnapshotProvider scmSnapshotProvider = mock(SCMSnapshotProvider.class); + when(scmSnapshotProvider.getSCMDBSnapshot(LEADER_SCM_ID)) + .thenReturn(checkpoint); + final SCMHAManager manager = new SCMHAManagerImpl(conf, - new SecurityConfig(conf), scm); + new SecurityConfig(conf), scm) { + @Override + protected SCMSnapshotProvider newScmSnapshotProvider(StorageContainerManager storageContainerManager) { + return scmSnapshotProvider; + } + }; when(scm.getScmHAManager()).thenReturn(manager); return scm; }