Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos;
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolProtos.CopyDBCheckpointResponseProto;
import org.apache.hadoop.hdds.protocol.scm.proto.InterSCMProtocolServiceGrpc;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
Expand Down Expand Up @@ -52,13 +51,10 @@ public class InterSCMGrpcClient implements SCMSnapshotDownloader{
private final InterSCMProtocolServiceGrpc.InterSCMProtocolServiceStub
client;

private final long timeout;

public InterSCMGrpcClient(final String host, final ConfigurationSource conf) {
Preconditions.checkNotNull(conf);
int port = conf.getInt(ScmConfigKeys.OZONE_SCM_GRPC_PORT_KEY,
ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT);
timeout =
public InterSCMGrpcClient(final String host, final int leaderPort,
final ConfigurationSource conf) {
final int port = leaderPort;
final long timeout =
conf.getObject(SCMHAConfiguration.class).getGrpcDeadlineInterval();
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forAddress(host, port).usePlaintext()
Expand Down Expand Up @@ -95,7 +91,7 @@ public void shutdown() {
}

@Override
public void close() throws Exception {
public void close() {
shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void start() throws IOException {
LOG.info("Ignore. already started.");
return;
} else {
LOG.info("Starting SCM Grpc Service at port {}", port);
server.start();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,19 @@ public void start() throws IOException {
if (ratisServer.getDivision().getGroup().getPeers().isEmpty()) {
// this is a bootstrapped node
// It will first try to add itself to existing ring
boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf),
final SCMNodeDetails nodeDetails =
scm.getSCMHANodeDetails().getLocalNodeDetails();
final boolean success = HAUtils.addSCM(OzoneConfiguration.of(conf),
new AddSCMRequest.Builder().setClusterId(scm.getClusterId())
.setScmId(scm.getScmId())
.setRatisAddr(scm.getSCMHANodeDetails().getLocalNodeDetails()
.setRatisAddr(nodeDetails
// TODO : Should we use IP instead of hostname??
.getRatisHostPortStr()).build(), scm.getSCMNodeId());
if (!success) {
throw new IOException("Adding SCM to existing HA group failed");
} else {
LOG.info("Successfully added SCM {} to group {}",
nodeDetails.getNodeId(), ratisServer.getDivision().getGroup());
}
} else {
LOG.info(" scm role is {} peers {}",
Expand Down Expand Up @@ -356,6 +361,11 @@ public void setExitManagerForTesting(ExitManager exitManagerForTesting) {
this.exitManager = exitManagerForTesting;
}

@VisibleForTesting
public void stopGrpcService() {
grpcServer.stop();
}

@VisibleForTesting
public static Logger getLogger() {
return LOG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,8 @@ public InetSocketAddress getDatanodeProtocolServerAddress() {
public String getDatanodeAddressKey() {
return datanodeAddressKey;
}

public int getGrpcPort() {
return grpcPort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ public interface SCMSnapshotDownloader {
*/
CompletableFuture<Path> download(Path destination) throws IOException;

void close() throws Exception;
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public class SCMSnapshotProvider {

private final ConfigurationSource conf;

private SCMSnapshotDownloader client;

private Map<String, SCMNodeDetails> peerNodesMap;

public SCMSnapshotProvider(ConfigurationSource conf,
Expand All @@ -81,13 +79,13 @@ public SCMSnapshotProvider(ConfigurationSource conf,
this.peerNodesMap.put(peerNode.getNodeId(), peerNode);
}
}
this.client = null;
}

@VisibleForTesting
public void setPeerNodesMap(Map<String, SCMNodeDetails> peerNodesMap) {
this.peerNodesMap = peerNodesMap;
}

/**
* Download the latest checkpoint from SCM Leader .
* @param leaderSCMNodeID leader SCM Node ID.
Expand All @@ -103,18 +101,19 @@ public DBCheckpoint getSCMDBSnapshot(String leaderSCMNodeID)
.getAbsolutePath();
File targetFile = new File(snapshotFilePath + ".tar.gz");

// the client instance will be initialized only when first install snapshot
// notification from ratis leader will be received.
if (client == null) {
client = new InterSCMGrpcClient(
peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(),
conf);
}
// the downloadClient instance will be created as and when install snapshot
// request is received. No caching of the client as it should be a very rare
int port = peerNodesMap.get(leaderSCMNodeID).getGrpcPort();
SCMSnapshotDownloader downloadClient = new InterSCMGrpcClient(
peerNodesMap.get(leaderSCMNodeID).getInetAddress().getHostAddress(),
port, conf);
try {
client.download(targetFile.toPath()).get();
} catch (InterruptedException | ExecutionException e) {
downloadClient.download(targetFile.toPath()).get();
} catch (ExecutionException | InterruptedException e) {
LOG.error("Rocks DB checkpoint downloading failed", e);
throw new IOException(e);
} finally {
downloadClient.close();
}


Expand All @@ -136,9 +135,4 @@ public File getScmSnapshotDir() {
return scmSnapshotDir;
}

public void stop() throws Exception {
if (client != null) {
client.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.EnumMap;
import java.util.Map;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -184,11 +186,20 @@ public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RaftProtos.RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {

String leaderNodeId = RaftPeerId.valueOf(roleInfoProto.getFollowerInfo()
.getLeaderInfo().getId().getId()).toString();
if (!roleInfoProto.getFollowerInfo().hasLeaderInfo()) {
return JavaUtils.completeExceptionally(new IOException("Failed to " +
"notifyInstallSnapshotFromLeader due to missing leader info"));
}
String leaderAddress = roleInfoProto.getFollowerInfo()
.getLeaderInfo().getId().getAddress();
Optional<SCMNodeDetails> leaderDetails =
scm.getSCMHANodeDetails().getPeerNodeDetails().stream().filter(
p -> p.getRatisHostPortStr().equals(leaderAddress))
.findFirst();
Preconditions.checkState(leaderDetails.isPresent());
final String leaderNodeId = leaderDetails.get().getNodeId();
LOG.info("Received install snapshot notification from SCM leader: {} with "
+ "term index: {}", leaderNodeId, firstTermIndexInLog);
+ "term index: {}", leaderAddress, firstTermIndexInLog);

CompletableFuture<TermIndex> future = CompletableFuture.supplyAsync(
() -> scm.getScmHAManager().installSnapshotFromLeader(leaderNodeId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ private DBCheckpoint downloadSnapshot() throws Exception {
RATIS, ONE, "Owner2").getPipelineID());
pipelineManager.openPipeline(ratisPipeline2.getId());
SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
.setRpcAddress(new InetSocketAddress("0.0.0.0", 0)).setSCMNodeId("scm1")
.setRpcAddress(new InetSocketAddress("0.0.0.0", 0))
.setGrpcPort(ScmConfigKeys.OZONE_SCM_GRPC_PORT_DEFAULT)
.setSCMNodeId("scm1")
.build();
Map<String, SCMNodeDetails> peerMap = new HashMap<>();
peerMap.put(scmNodeDetails.getNodeId(), scmNodeDetails);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.Disabled;
import org.slf4j.Logger;
import org.slf4j.event.Level;

Expand All @@ -70,7 +69,7 @@ public class TestSCMInstallSnapshotWithHA {
private int numOfSCMs = 3;

private static final long SNAPSHOT_THRESHOLD = 5;
// private static final int LOG_PURGE_GAP = 5;
private static final int LOG_PURGE_GAP = 5;

/**
* Create a MiniOzoneCluster for testing.
Expand All @@ -86,8 +85,8 @@ public void init() throws Exception {
scmServiceId = "scm-service-test1";
SCMHAConfiguration scmhaConfiguration =
conf.getObject(SCMHAConfiguration.class);
// scmhaConfiguration.setRaftLogPurgeEnabled(true);
// scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP);
scmhaConfiguration.setRaftLogPurgeEnabled(true);
scmhaConfiguration.setRaftLogPurgeGap(LOG_PURGE_GAP);
scmhaConfiguration.setRatisSnapshotThreshold(SNAPSHOT_THRESHOLD);
conf.setFromObject(scmhaConfiguration);

Expand All @@ -113,22 +112,10 @@ public void shutdown() {
}
}

/**
* This test is disabled for now as there seems to be an issue with
* Ratis install Snapshot code. In ratis while a new node gets added,
* unless and until the node gets added to the voter list, the follower state
* is not updated with leader info. So, while an install snapshot notification
* is received in the leader, the leader info is not set and hence, out of
* ratis transfer using the same leader info doesn't work.
*
* TODO: Fix this
* */
@Test
@Disabled
public void testInstallSnapshot() throws Exception {
// Get the leader SCM
StorageContainerManager leaderSCM = getLeader(cluster);
String leaderNodeId = leaderSCM.getScmNodeDetails().getNodeId();
Assert.assertNotNull(leaderSCM);
// Find the inactive SCM
String followerId = getInactiveSCM(cluster).getScmId();
Expand All @@ -137,44 +124,17 @@ public void testInstallSnapshot() throws Exception {
// Do some transactions so that the log index increases
List<ContainerInfo> containers = writeToIncreaseLogIndex(leaderSCM, 200);

// Get the latest db checkpoint from the leader SCM.
TransactionInfo transactionInfo =
leaderSCM.getScmHAManager().asSCMHADBTransactionBuffer()
.getLatestTrxInfo();
TermIndex leaderTermIndex =
TermIndex.valueOf(transactionInfo.getTerm(),
transactionInfo.getTransactionIndex());
long leaderSnaphsotIndex = leaderTermIndex.getIndex();
long leaderSnapshotTermIndex = leaderTermIndex.getTerm();

DBCheckpoint leaderDbCheckpoint =
leaderSCM.getScmMetadataStore().getStore().getCheckpoint(false);

// Start the inactive
// Start the inactive SCM. Install Snapshot will happen as part
// of setConfiguration() call to ratis leader and the follower will catch
// up
cluster.startInactiveSCM(followerId);

// The recently started should be lagging behind the leader .
long followerLastAppliedIndex =
follower.getScmHAManager().getRatisServer().getSCMStateMachine()
.getLastAppliedTermIndex().getIndex();
assertTrue(
followerLastAppliedIndex < leaderSnaphsotIndex);

SCMHAManagerImpl scmhaManager =
(SCMHAManagerImpl) (follower.getScmHAManager());
// Install leader 's db checkpoint on the lagging .
scmhaManager.installCheckpoint(leaderNodeId, leaderDbCheckpoint);

SCMStateMachine followerStateMachine =
follower.getScmHAManager().getRatisServer().getSCMStateMachine();
// After the new checkpoint is installed, the follower
// lastAppliedIndex must >= the snapshot index of the checkpoint. It
// could be great than snapshot index if there is any conf entry from ratis.
followerLastAppliedIndex = followerStateMachine
.getLastAppliedTermIndex().getIndex();
assertTrue(followerLastAppliedIndex >= leaderSnaphsotIndex);
assertTrue(followerStateMachine
.getLastAppliedTermIndex().getTerm() >= leaderSnapshotTermIndex);
followerLastAppliedIndex >= 200);

// Verify that the follower 's DB contains the transactions which were
// made while it was inactive.
Expand Down Expand Up @@ -317,7 +277,7 @@ private List<ContainerInfo> writeToIncreaseLogIndex(
scm.getScmHAManager().getRatisServer().getSCMStateMachine();
long logIndex = scm.getScmHAManager().getRatisServer().getSCMStateMachine()
.getLastAppliedTermIndex().getIndex();
while (logIndex < targetLogIndex) {
while (logIndex <= targetLogIndex) {
containers.add(scm.getContainerManager()
.allocateContainer(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<declared.ozone.version>${ozone.version}</declared.ozone.version>

<!-- Apache Ratis version -->
<ratis.version>2.0.0</ratis.version>
<ratis.version>2.1.0-43915d2-SNAPSHOT</ratis.version>

<!-- Apache Ratis thirdparty version -->
<ratis.thirdparty.version>0.6.0</ratis.thirdparty.version>
<ratis.thirdparty.version>0.7.0-a398b19-SNAPSHOT</ratis.thirdparty.version>

<distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
<distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
Expand Down Expand Up @@ -183,7 +183,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<grpc-compile.version>1.33.0</grpc-compile.version>
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>

<netty.version>4.1.51.Final</netty.version>
<netty.version>4.1.63.Final</netty.version>

<!-- define the Java language version used by the compiler -->
<javac.version>1.8</javac.version>
Expand Down