Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ public SCMHAManagerImpl(final ConfigurationSource conf,
this.transactionBuffer = new SCMHADBTransactionBufferImpl(scm);
this.ratisServer = new SCMRatisServerImpl(conf, scm,
(SCMHADBTransactionBuffer) transactionBuffer);
this.scmSnapshotProvider = new SCMSnapshotProvider(conf,
scm.getSCMHANodeDetails().getPeerNodeDetails(),
scm.getScmCertificateClient());
this.scmSnapshotProvider = newScmSnapshotProvider(scm);
grpcServer = new InterSCMGrpcProtocolService(conf, scm);
} else {
this.transactionBuffer = new SCMDBTransactionBufferImpl();
Expand All @@ -109,6 +107,13 @@ public SCMHAManagerImpl(final ConfigurationSource conf,

}

@VisibleForTesting
protected SCMSnapshotProvider newScmSnapshotProvider(StorageContainerManager storageContainerManager) {
return new SCMSnapshotProvider(storageContainerManager.getConfiguration(),
storageContainerManager.getSCMHANodeDetails().getPeerNodeDetails(),
storageContainerManager.getScmCertificateClient());
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -194,8 +199,10 @@ public DBCheckpoint downloadCheckpointFromLeader(String leaderId) {
}

DBCheckpoint dBCheckpoint = getDBCheckpointFromLeader(leaderId);
LOG.info("Downloaded checkpoint from Leader {} to the location {}",
leaderId, dBCheckpoint.getCheckpointLocation());
if (dBCheckpoint != null) {
LOG.info("Downloaded checkpoint from Leader {} to the location {}",
leaderId, dBCheckpoint.getCheckpointLocation());
}
return dBCheckpoint;
}

Expand Down Expand Up @@ -262,7 +269,7 @@ private DBCheckpoint getDBCheckpointFromLeader(String leaderId) {

try {
return scmSnapshotProvider.getSCMDBSnapshot(leaderId);
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Failed to download checkpoint from SCM leader {}", leaderId,
e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
String leaderAddress = roleInfoProto.getFollowerInfo()
.getLeaderInfo().getId().getAddress();
Optional<SCMNodeDetails> leaderDetails =
scm.getSCMHANodeDetails().getPeerNodeDetails().stream().filter(
p -> p.getRatisHostPortStr().equals(leaderAddress))
scm.getSCMHANodeDetails().getPeerNodeDetails().stream()
.filter(p -> p.getRatisHostPortStr().equals(leaderAddress))
.findFirst();
Preconditions.checkState(leaderDetails.isPresent());
final String leaderNodeId = leaderDetails.get().getNodeId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hdds.scm.ha;

import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.time.Clock;
import java.time.ZoneOffset;
Expand All @@ -40,8 +41,10 @@
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.server.DivisionInfo;
Expand All @@ -55,9 +58,11 @@
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -73,6 +78,8 @@
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class TestSCMHAManagerImpl {

private static final String LEADER_SCM_ID = "leader";
private static final int LEADER_PORT = 9894;
private static final String FOLLOWER_SCM_ID = "follower";

private Path storageBaseDir;
Expand All @@ -85,18 +92,17 @@ void setup(@TempDir Path tempDir) throws IOException, InterruptedException,
TimeoutException {
storageBaseDir = tempDir;
clusterID = UUID.randomUUID().toString();
OzoneConfiguration conf = getConfig("scm1", 9894);
final StorageContainerManager scm = getMockStorageContainerManager(conf);
SCMRatisServerImpl.initialize(clusterID, scm.getScmId(),
scm.getScmNodeDetails(), conf);
final StorageContainerManager scm = getMockStorageContainerManager(LEADER_SCM_ID, LEADER_PORT);
SCMRatisServerImpl.initialize(clusterID, LEADER_SCM_ID, scm.getScmNodeDetails(), scm.getConfiguration());
primarySCMHAManager = scm.getScmHAManager();
primarySCMHAManager.start();
final DivisionInfo ratisDivision = primarySCMHAManager.getRatisServer()
.getDivision().getInfo();
// Wait for Ratis Server to be ready
waitForSCMToBeReady(ratisDivision);
follower = getMockStorageContainerManager(getConfig(FOLLOWER_SCM_ID, 9898))
.getScmHAManager().getRatisServer();
StorageContainerManager followerSCM = getMockStorageContainerManager(FOLLOWER_SCM_ID, 9898);
follower = followerSCM.getScmHAManager()
.getRatisServer();
}

private OzoneConfiguration getConfig(String scmId, int ratisPort) {
Expand Down Expand Up @@ -150,10 +156,16 @@ private int getPeerCount() {
.getDivision().getGroup().getPeers().size();
}

private String getRaftServerAddress(SCMRatisServer ratisServer) {
return "localhost:" + ratisServer.getDivision()
.getRaftServer()
.getServerRpc()
.getInetSocketAddress()
.getPort();
}

private String getFollowerAddress() {
return "localhost:" +
follower.getDivision()
.getRaftServer().getServerRpc().getInetSocketAddress().getPort();
return getRaftServerAddress(follower);
}

@Test
Expand Down Expand Up @@ -186,9 +198,8 @@ void testHARingRemovalErrors() throws IOException,
}
}

private StorageContainerManager getMockStorageContainerManager(
OzoneConfiguration conf) throws IOException {
final String scmID = UUID.randomUUID().toString();
private StorageContainerManager getMockStorageContainerManager(String scmID, int port) throws IOException {
OzoneConfiguration conf = getConfig(scmID, port);

final DBStore dbStore = mock(DBStore.class);
final SCMContext scmContext = mock(SCMContext.class);
Expand All @@ -213,6 +224,7 @@ private StorageContainerManager getMockStorageContainerManager(
mock(SCMDatanodeProtocolServer.class);

when(scm.getClusterId()).thenReturn(clusterID);
when(scm.getConfiguration()).thenReturn(conf);
when(scm.getScmId()).thenReturn(scmID);
when(scm.getScmMetadataStore()).thenReturn(metadataStore);
when(scm.getScmNodeDetails()).thenReturn(nodeDetails);
Expand All @@ -231,12 +243,33 @@ private StorageContainerManager getMockStorageContainerManager(
when(scmHANodeDetails.getLocalNodeDetails()).thenReturn(nodeDetails);
when(blockManager.getDeletedBlockLog()).thenReturn(deletedBlockLog);
when(dbStore.initBatchOperation()).thenReturn(batchOperation);
when(nodeDetails.getRatisHostPortStr()).thenReturn("localhost:" +
conf.get(ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY));
when(nodeDetails.getRatisHostPortStr()).thenReturn("localhost:" + port);
when(scm.getSystemClock()).thenReturn(Clock.system(ZoneOffset.UTC));

if (FOLLOWER_SCM_ID.equals(scmID)) {
final SCMNodeDetails leaderNodeDetails = mock(SCMNodeDetails.class);
final List<SCMNodeDetails> peerNodeDetails = singletonList(leaderNodeDetails);
when(scmHANodeDetails.getPeerNodeDetails()).thenReturn(peerNodeDetails);
when(leaderNodeDetails.getNodeId()).thenReturn(LEADER_SCM_ID);
when(leaderNodeDetails.getGrpcPort()).thenReturn(LEADER_PORT);
when(leaderNodeDetails.getRatisHostPortStr()).thenReturn("localhost:" + LEADER_PORT);
InetSocketAddress rpcAddress = NetUtils.createSocketAddr("localhost", LEADER_PORT);
when(leaderNodeDetails.getRpcAddress()).thenReturn(rpcAddress);
when(leaderNodeDetails.getInetAddress()).thenReturn(rpcAddress.getAddress());
}

DBCheckpoint checkpoint = mock(DBCheckpoint.class);
SCMSnapshotProvider scmSnapshotProvider = mock(SCMSnapshotProvider.class);
when(scmSnapshotProvider.getSCMDBSnapshot(LEADER_SCM_ID))
.thenReturn(checkpoint);

final SCMHAManager manager = new SCMHAManagerImpl(conf,
new SecurityConfig(conf), scm);
new SecurityConfig(conf), scm) {
@Override
protected SCMSnapshotProvider newScmSnapshotProvider(StorageContainerManager storageContainerManager) {
return scmSnapshotProvider;
}
};
when(scm.getScmHAManager()).thenReturn(manager);
return scm;
}
Expand Down