Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,9 @@ private OzoneConsts() {

// SCM HA
public static final String SCM_SERVICE_ID_DEFAULT = "scmServiceIdDefault";

// SCM Ratis snapshot file to store the last applied index
public static final String SCM_RATIS_SNAPSHOT_INDEX = "scmRatisSnapshotIndex";

public static final String SCM_RATIS_SNAPSHOT_TERM = "scmRatisSnapshotTerm";
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;

/**
* SCM utility class.
*/
Expand All @@ -48,4 +50,14 @@ public static void preCheck(ScmOps operation, Precheck... preChecks)
}
}

/**
* Create SCM directory file based on given path.
*/
public static File createSCMDir(String dirPath) {
File dirFile = new File(dirPath);
if (!dirFile.mkdirs() && !dirFile.exists()) {
throw new IllegalArgumentException("Unable to create path: " + dirFile);
}
return dirFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,9 @@ ContainerInfo getMatchingContainer(long size, String owner,
* @param success
*/
void notifyContainerReportProcessing(boolean isFullReport, boolean success);

/**
* Flush metadata of container manager if they are required to be persisted.
*/
void flushDB() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,13 @@ private HddsProtos.LifeCycleState updateContainerState(
}
}

@Override
public void flushDB() throws IOException {
if (containerStore != null) {
containerStore.flushDB(true);
}
}

/**
* Update deleteTransactionId according to deleteTransactionMap.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,15 @@

package org.apache.hadoop.hdds.scm.ha;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer;

import java.io.File;
import java.util.Collection;

/**
* Utility class used by SCM HA.
Expand All @@ -34,4 +41,39 @@ public static boolean isSCMHAEnabled(OzoneConfiguration conf) {
return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY,
ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT);
}

public static File createSCMRatisDir(OzoneConfiguration conf)
throws IllegalArgumentException {
String scmRatisDir = SCMRatisServer.getSCMRatisDirectory(conf);
if (scmRatisDir == null || scmRatisDir.isEmpty()) {
throw new IllegalArgumentException(HddsConfigKeys.OZONE_METADATA_DIRS +
" must be defined.");
}
return ScmUtils.createSCMDir(scmRatisDir);
}

/**
* Get a collection of all scmNodeIds for the given scmServiceId.
*/
public static Collection<String> getSCMNodeIds(Configuration conf,
String scmServiceId) {
String key = addSuffix(ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId);
return conf.getTrimmedStringCollection(key);
}

public static String getLocalSCMNodeId(String scmServiceId) {
return addSuffix(ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId);
}

/**
* Add non empty and non null suffix to a key.
*/
private static String addSuffix(String key, String suffix) {
if (suffix == null || suffix.isEmpty()) {
return key;
}
assert !suffix.startsWith(".") :
"suffix '" + suffix + "' should not already have '.' prepended.";
return key + "." + suffix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,6 +28,7 @@
import java.net.InetSocketAddress;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY;

/**
* Construct SCM node details.
Expand Down Expand Up @@ -153,6 +153,18 @@ public String getRpcAddressString() {
public static SCMNodeDetails initStandAlone(
OzoneConfiguration conf) throws IOException {
String localSCMServiceId = conf.getTrimmed(OZONE_SCM_INTERNAL_SERVICE_ID);
if (localSCMServiceId == null) {
// There is no internal om service id is being set, fall back to ozone
// .om.service.ids.
LOG.info("{} is not defined, falling back to {} to find serviceID for "
+ "SCM if it is HA enabled cluster",
OZONE_SCM_INTERNAL_SERVICE_ID, OZONE_SCM_SERVICE_IDS_KEY);
localSCMServiceId = conf.getTrimmed(
OZONE_SCM_SERVICE_IDS_KEY);
} else {
LOG.info("ServiceID for SCM is {}", localSCMServiceId);
}
String localSCMNodeId = SCMHAUtils.getLocalSCMNodeId(localSCMServiceId);
int ratisPort = conf.getInt(
ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
Expand All @@ -161,8 +173,8 @@ public static SCMNodeDetails initStandAlone(
SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
.setRatisPort(ratisPort)
.setRpcAddress(rpcAddress)
.setSCMNodeId(localSCMServiceId)
.setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
.setSCMNodeId(localSCMNodeId)
.setSCMServiceId(localSCMServiceId)
.build();
return scmNodeDetails;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.cache.RemovalListener;
import com.google.protobuf.BlockingService;

import java.io.File;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Collections;
Expand All @@ -39,7 +40,8 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
import org.apache.hadoop.hdds.scm.ratis.SCMRatisServer;
import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer;
import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisSnapshotInfo;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
Expand Down Expand Up @@ -110,6 +112,7 @@
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.hdds.utils.HddsVersionInfo;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.server.protocol.TermIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -196,6 +199,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl

// SCM HA related
private SCMRatisServer scmRatisServer;
private SCMRatisSnapshotInfo scmRatisSnapshotInfo;
private File scmRatisSnapshotDir;

private JvmPauseMonitor jvmPauseMonitor;
private final OzoneConfiguration configuration;
Expand Down Expand Up @@ -266,6 +271,9 @@ public StorageContainerManager(OzoneConfiguration conf,
}

if (SCMHAUtils.isSCMHAEnabled(conf)) {
this.scmRatisSnapshotInfo = new SCMRatisSnapshotInfo(
scmStorageConfig.getCurrentDir());
this.scmRatisSnapshotDir = SCMHAUtils.createSCMRatisDir(conf);
initializeRatisServer();
} else {
scmRatisServer = null;
Expand Down Expand Up @@ -788,6 +796,10 @@ public void start() throws IOException {
getClientRpcAddress()));
}

if (scmRatisServer != null) {
scmRatisServer.start();
}

ms = HddsServerUtil
.initializeMetrics(configuration, "StorageContainerManager");

Expand Down Expand Up @@ -1134,4 +1146,38 @@ private void initializeRatisServer() throws IOException {
}
}
}

@VisibleForTesting
public SCMRatisServer getScmRatisServer() {
return scmRatisServer;
}

@VisibleForTesting
public SCMRatisSnapshotInfo getSnapshotInfo() {
return scmRatisSnapshotInfo;
}

@VisibleForTesting
public long getRatisSnapshotIndex() {
return scmRatisSnapshotInfo.getIndex();
}

/**
* Save ratis snapshot to SCM meta store and local disk.
*/
public TermIndex saveRatisSnapshot() throws IOException {
TermIndex snapshotIndex = scmRatisServer.getLastAppliedTermIndex();
if (scmMetadataStore != null) {
// Flush the SCM state to disk
scmMetadataStore.getStore().flush();
}

if (containerManager != null) {
containerManager.flushDB();
}

scmRatisSnapshotInfo.saveRatisSnapshotToDisk(snapshotIndex);

return snapshotIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.ratis;
package org.apache.hadoop.hdds.scm.server.ratis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
Expand All @@ -42,6 +42,7 @@
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.SizeInBytes;
Expand Down Expand Up @@ -94,6 +95,10 @@ private static long nextCallId() {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}

/**
* Creates a SCM Ratis Server.
* @throws IOException
*/
private SCMRatisServer(Configuration conf,
StorageContainerManager scm,
String raftGroupIdStr, RaftPeerId localRaftPeerId,
Expand Down Expand Up @@ -139,6 +144,9 @@ public void run() {
}, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
}

/**
* Create a SCM Ratis Server instance.
*/
public static SCMRatisServer newSCMRatisServer(
Configuration conf, StorageContainerManager scm,
SCMNodeDetails scmNodeDetails, List<SCMNodeDetails> peers)
Expand Down Expand Up @@ -178,7 +186,7 @@ private SCMStateMachine getStateMachine() {
return new SCMStateMachine(this);
}

private RaftProperties newRaftProperties(Configuration conf){
private RaftProperties newRaftProperties(Configuration conf) {
final RaftProperties properties = new RaftProperties();
// Set RPC type
final String rpcType = conf.get(
Expand Down Expand Up @@ -403,6 +411,15 @@ public Optional<RaftPeerId> getCachedLeaderPeerId() {
}
}

public StorageContainerManager getSCM() {
return scm;
}

@VisibleForTesting
public SCMStateMachine getScmStateMachine() {
return scmStateMachine;
}

public int getServerPort() {
return port;
}
Expand Down Expand Up @@ -441,6 +458,10 @@ public void updateServerRole() {
}
}

public TermIndex getLastAppliedTermIndex() {
return scmStateMachine.getLastAppliedTermIndex();
}

private GroupInfoReply getGroupInfo() throws IOException {
GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
raftPeerId, raftGroupId, nextCallId());
Expand Down
Loading