hdds.metadata.dir
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
new file mode 100644
index 000000000000..c0364adb1b16
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+
+/**
+ * Utility class used by SCM HA.
+ */
+public final class SCMHAUtils {
+ private SCMHAUtils() {
+ // not used
+ }
+
+ // Check if SCM HA is enabled.
+ public static boolean isSCMHAEnabled(OzoneConfiguration conf) {
+ return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY,
+ ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
new file mode 100644
index 000000000000..8d66187ee748
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID;
+
+/**
+ * Construct SCM node details.
+ */
+public final class SCMNodeDetails {
+ private String scmServiceId;
+ private String scmNodeId;
+ private InetSocketAddress rpcAddress;
+ private int rpcPort;
+ private int ratisPort;
+ private String httpAddress;
+ private String httpsAddress;
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(SCMNodeDetails.class);
+
+ /**
+ * Constructs SCMNodeDetails object.
+ */
+ private SCMNodeDetails(String serviceId, String nodeId,
+ InetSocketAddress rpcAddr, int rpcPort, int ratisPort,
+ String httpAddress, String httpsAddress) {
+ this.scmServiceId = serviceId;
+ this.scmNodeId = nodeId;
+ this.rpcAddress = rpcAddr;
+ this.rpcPort = rpcPort;
+ this.ratisPort = ratisPort;
+ this.httpAddress = httpAddress;
+ this.httpsAddress = httpsAddress;
+ }
+
+ @Override
+ public String toString() {
+ return "SCMNodeDetails["
+ + "scmServiceId=" + scmServiceId +
+ ", scmNodeId=" + scmNodeId +
+ ", rpcAddress=" + rpcAddress +
+ ", rpcPort=" + rpcPort +
+ ", ratisPort=" + ratisPort +
+ ", httpAddress=" + httpAddress +
+ ", httpsAddress=" + httpsAddress +
+ "]";
+ }
+
+ /**
+ * Builder class for SCMNodeDetails.
+ */
+ public static class Builder {
+ private String scmServiceId;
+ private String scmNodeId;
+ private InetSocketAddress rpcAddress;
+ private int rpcPort;
+ private int ratisPort;
+ private String httpAddr;
+ private String httpsAddr;
+
+ public Builder setRpcAddress(InetSocketAddress rpcAddr) {
+ this.rpcAddress = rpcAddr;
+ this.rpcPort = rpcAddress.getPort();
+ return this;
+ }
+
+ public Builder setRatisPort(int port) {
+ this.ratisPort = port;
+ return this;
+ }
+
+ public Builder setSCMServiceId(String serviceId) {
+ this.scmServiceId = serviceId;
+ return this;
+ }
+
+ public Builder setSCMNodeId(String nodeId) {
+ this.scmNodeId = nodeId;
+ return this;
+ }
+
+ public Builder setHttpAddress(String httpAddress) {
+ this.httpAddr = httpAddress;
+ return this;
+ }
+
+ public Builder setHttpsAddress(String httpsAddress) {
+ this.httpsAddr = httpsAddress;
+ return this;
+ }
+
+ public SCMNodeDetails build() {
+ return new SCMNodeDetails(scmServiceId, scmNodeId, rpcAddress, rpcPort,
+ ratisPort, httpAddr, httpsAddr);
+ }
+ }
+
+ public String getSCMServiceId() {
+ return scmServiceId;
+ }
+
+ public String getSCMNodeId() {
+ return scmNodeId;
+ }
+
+ public InetSocketAddress getRpcAddress() {
+ return rpcAddress;
+ }
+
+ public InetAddress getAddress() {
+ return rpcAddress.getAddress();
+ }
+
+ public int getRatisPort() {
+ return ratisPort;
+ }
+
+ public int getRpcPort() {
+ return rpcPort;
+ }
+
+ public String getRpcAddressString() {
+ return NetUtils.getHostPortString(rpcAddress);
+ }
+
+ public static SCMNodeDetails initStandAlone(
+ OzoneConfiguration conf) throws IOException {
+ String localSCMServiceId = conf.getTrimmed(OZONE_SCM_INTERNAL_SERVICE_ID);
+ int ratisPort = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
+ InetSocketAddress rpcAddress = new InetSocketAddress(
+ InetAddress.getLocalHost(), 0);
+ SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder()
+ .setRatisPort(ratisPort)
+ .setRpcAddress(rpcAddress)
+ .setSCMNodeId(localSCMServiceId)
+ .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
+ .build();
+ return scmNodeDetails;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java
new file mode 100644
index 000000000000..06fe1685717d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+/**
+ * This package contains classes related to SCM HA.
+ */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java
new file mode 100644
index 000000000000..af1e5c2c30ba
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ratis;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Class for SCM Ratis Server.
+ */
+public final class SCMRatisServer {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SCMRatisServer.class);
+
+ private final StorageContainerManager scm;
+ private final SCMStateMachine scmStateMachine;
+
+ private final int port;
+ private final InetSocketAddress scmRatisAddress;
+ private final RaftServer server;
+ private final RaftGroupId raftGroupId;
+ private final RaftGroup raftGroup;
+ private final RaftPeerId raftPeerId;
+
+ private final ClientId clientId = ClientId.randomId();
+ private final ScheduledExecutorService scheduledRoleChecker;
+ private long roleCheckInitialDelayMs = 1000; // 1 second default
+ private long roleCheckIntervalMs;
+ private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
+ private Optional cachedPeerRole = Optional.empty();
+ private Optional cachedLeaderPeerId = Optional.empty();
+
+ private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+ private static long nextCallId() {
+ return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+ }
+
+ private SCMRatisServer(Configuration conf,
+ StorageContainerManager scm,
+ String raftGroupIdStr, RaftPeerId localRaftPeerId,
+ InetSocketAddress addr, List raftPeers)
+ throws IOException {
+ this.scm = scm;
+ this.scmRatisAddress = addr;
+ this.port = addr.getPort();
+ RaftProperties serverProperties = newRaftProperties(conf);
+
+ this.raftPeerId = localRaftPeerId;
+ this.raftGroupId = RaftGroupId.valueOf(
+ getRaftGroupIdFromOmServiceId(raftGroupIdStr));
+ this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
+
+ StringBuilder raftPeersStr = new StringBuilder();
+ for (RaftPeer peer : raftPeers) {
+ raftPeersStr.append(", ").append(peer.getAddress());
+ }
+ LOG.info("Instantiating SCM Ratis server with GroupID: {} and " +
+ "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
+ this.scmStateMachine = getStateMachine();
+
+ this.server = RaftServer.newBuilder()
+ .setServerId(this.raftPeerId)
+ .setGroup(this.raftGroup)
+ .setProperties(serverProperties)
+ .setStateMachine(scmStateMachine)
+ .build();
+
+ // Run a scheduler to check and update the server role on the leader
+ // periodically
+ this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor();
+ this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ // Run this check only on the leader OM
+ if (cachedPeerRole.isPresent() &&
+ cachedPeerRole.get() == RaftProtos.RaftPeerRole.LEADER) {
+ updateServerRole();
+ }
+ }
+ }, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ public static SCMRatisServer newSCMRatisServer(
+ Configuration conf, StorageContainerManager scm,
+ SCMNodeDetails scmNodeDetails, List peers)
+ throws IOException {
+ String scmServiceId = scmNodeDetails.getSCMServiceId();
+
+ String scmNodeId = scmNodeDetails.getSCMNodeId();
+ RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(scmNodeId);
+ InetSocketAddress ratisAddr = new InetSocketAddress(
+ scmNodeDetails.getAddress(), scmNodeDetails.getRatisPort());
+
+ RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr);
+
+ List raftPeers = new ArrayList<>();
+ raftPeers.add(localRaftPeer);
+
+ for (SCMNodeDetails peer : peers) {
+ String peerNodeId = peer.getSCMNodeId();
+ InetSocketAddress peerRatisAddr = new InetSocketAddress(
+ peer.getAddress(), peer.getRatisPort());
+ RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
+ RaftPeer raftPeer = new RaftPeer(raftPeerId, peerRatisAddr);
+ // Add other SCMs in Ratis ring
+ raftPeers.add(raftPeer);
+ }
+
+ return new SCMRatisServer(conf, scm, scmServiceId, localRaftPeerId,
+ ratisAddr, raftPeers);
+ }
+
+ private UUID getRaftGroupIdFromOmServiceId(String scmServiceId) {
+ return UUID.nameUUIDFromBytes(scmServiceId.getBytes(
+ StandardCharsets.UTF_8));
+ }
+
+ private SCMStateMachine getStateMachine() {
+ return new SCMStateMachine(this);
+ }
+
+ private RaftProperties newRaftProperties(Configuration conf){
+ final RaftProperties properties = new RaftProperties();
+ // Set RPC type
+ final String rpcType = conf.get(
+ ScmConfigKeys.OZONE_SCM_RATIS_RPC_TYPE_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_RPC_TYPE_DEFAULT);
+ final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
+ RaftConfigKeys.Rpc.setType(properties, rpc);
+ // Set the ratis port number
+ if (rpc == SupportedRpcType.GRPC) {
+ GrpcConfigKeys.Server.setPort(properties, port);
+ } else if (rpc == SupportedRpcType.NETTY) {
+ NettyConfigKeys.Server.setPort(properties, port);
+ }
+ // Set Ratis storage directory
+ String storageDir = SCMRatisServer.getSCMRatisDirectory(conf);
+ RaftServerConfigKeys.setStorageDirs(properties,
+ Collections.singletonList(new File(storageDir)));
+ // Set RAFT segment size
+ final int raftSegmentSize = (int) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
+ SizeInBytes.valueOf(raftSegmentSize));
+ // Set RAFT segment pre-allocated size
+ final int raftSegmentPreallocatedSize = (int) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ int logAppenderQueueNumElements = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS,
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT);
+ final int logAppenderQueueByteLimit = (int) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
+ StorageUnit.BYTES);
+ RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties,
+ logAppenderQueueNumElements);
+ RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
+ SizeInBytes.valueOf(logAppenderQueueByteLimit));
+ RaftServerConfigKeys.Log.setPreallocatedSize(properties,
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
+ false);
+ final int logPurgeGap = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_PURGE_GAP,
+ ScmConfigKeys.OZONE_SCM_RATIS_LOG_PURGE_GAP_DEFAULT);
+ RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap);
+ // For grpc set the maximum message size
+ // TODO: calculate the optimal max message size
+ GrpcConfigKeys.setMessageSizeMax(properties,
+ SizeInBytes.valueOf(logAppenderQueueByteLimit));
+
+ // Set the server request timeout
+ TimeUnit serverRequestTimeoutUnit =
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getUnit();
+ long serverRequestTimeoutDuration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT
+ .getDuration(), serverRequestTimeoutUnit);
+ final TimeDuration serverRequestTimeout = TimeDuration.valueOf(
+ serverRequestTimeoutDuration, serverRequestTimeoutUnit);
+ RaftServerConfigKeys.Rpc.setRequestTimeout(properties,
+ serverRequestTimeout);
+ // Set timeout for server retry cache entry
+ TimeUnit retryCacheTimeoutUnit = ScmConfigKeys
+ .OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getUnit();
+ long retryCacheTimeoutDuration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT
+ .getDuration(), retryCacheTimeoutUnit);
+ final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
+ retryCacheTimeoutDuration, retryCacheTimeoutUnit);
+ RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
+ retryCacheTimeout);
+ // Set the server min and max timeout
+ TimeUnit serverMinTimeoutUnit =
+ ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getUnit();
+ long serverMinTimeoutDuration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT
+ .getDuration(), serverMinTimeoutUnit);
+ final TimeDuration serverMinTimeout = TimeDuration.valueOf(
+ serverMinTimeoutDuration, serverMinTimeoutUnit);
+ long serverMaxTimeoutDuration =
+ serverMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200;
+ final TimeDuration serverMaxTimeout = TimeDuration.valueOf(
+ serverMaxTimeoutDuration, serverMinTimeoutUnit);
+ RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+ serverMinTimeout);
+ RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+ serverMaxTimeout);
+ // Set the number of maximum cached segments
+ RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
+ // TODO: set max write buffer size
+ // Set the ratis leader election timeout
+ TimeUnit leaderElectionMinTimeoutUnit =
+ ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
+ .getUnit();
+ long leaderElectionMinTimeoutduration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
+ .getDuration(), leaderElectionMinTimeoutUnit);
+ final TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf(
+ leaderElectionMinTimeoutduration, leaderElectionMinTimeoutUnit);
+ RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+ leaderElectionMinTimeout);
+ long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong(
+ TimeUnit.MILLISECONDS) + 200;
+ RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+ TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
+ TimeUnit nodeFailureTimeoutUnit =
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+ .getUnit();
+ long nodeFailureTimeoutDuration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
+ .getDuration(), nodeFailureTimeoutUnit);
+ final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
+ nodeFailureTimeoutDuration, nodeFailureTimeoutUnit);
+ RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties,
+ nodeFailureTimeout);
+ RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
+ nodeFailureTimeout);
+
+ // Ratis leader role check
+ TimeUnit roleCheckIntervalUnit =
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+ .getUnit();
+ long roleCheckIntervalDuration = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+ .getDuration(), nodeFailureTimeoutUnit);
+ this.roleCheckIntervalMs = TimeDuration.valueOf(
+ roleCheckIntervalDuration, roleCheckIntervalUnit)
+ .toLong(TimeUnit.MILLISECONDS);
+ this.roleCheckInitialDelayMs = leaderElectionMinTimeout
+ .toLong(TimeUnit.MILLISECONDS);
+
+ return properties;
+ }
+
+ /**
+ * Start the Ratis server.
+ * @throws IOException
+ */
+ public void start() throws IOException {
+ LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
+ server.getId(), port);
+ server.start();
+ }
+
+ /**
+ * Stop the Ratis server.
+ */
+ public void stop() {
+ try {
+ server.close();
+ scmStateMachine.stop();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean checkCachedPeerRoleIsLeader() {
+ this.roleCheckLock.readLock().lock();
+ try {
+ if (cachedPeerRole.isPresent() &&
+ cachedPeerRole.get() == RaftProtos.RaftPeerRole.LEADER) {
+ return true;
+ }
+ return false;
+ } finally {
+ this.roleCheckLock.readLock().unlock();
+ }
+ }
+
+ public boolean isLeader() {
+ if (checkCachedPeerRoleIsLeader()) {
+ return true;
+ }
+
+ // Get the server role from ratis server and update the cached values.
+ updateServerRole();
+
+ // After updating the server role, check and return if leader or not.
+ return checkCachedPeerRoleIsLeader();
+ }
+
+ @VisibleForTesting
+ public LifeCycle.State getServerState() {
+ return server.getLifeCycleState();
+ }
+
+ @VisibleForTesting
+ public RaftPeerId getRaftPeerId() {
+ return this.raftPeerId;
+ }
+
+ public RaftGroup getRaftGroup() {
+ return this.raftGroup;
+ }
+
+ /**
+ * Get the local directory where ratis logs will be stored.
+ */
+ public static String getSCMRatisDirectory(Configuration conf) {
+ String storageDir = conf.get(ScmConfigKeys.OZONE_SCM_RATIS_STORAGE_DIR);
+
+ if (Strings.isNullOrEmpty(storageDir)) {
+ storageDir = ServerUtils.getDefaultRatisDirectory(conf);
+ }
+ return storageDir;
+ }
+
+ public Optional getCachedLeaderPeerId() {
+ this.roleCheckLock.readLock().lock();
+ try {
+ return cachedLeaderPeerId;
+ } finally {
+ this.roleCheckLock.readLock().unlock();
+ }
+ }
+
+ public int getServerPort() {
+ return port;
+ }
+
+ public void updateServerRole() {
+ try {
+ GroupInfoReply groupInfo = getGroupInfo();
+ RaftProtos.RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
+ RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole();
+
+ if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) {
+ setServerRole(thisNodeRole, raftPeerId);
+
+ } else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) {
+ ByteString leaderNodeId = roleInfoProto.getFollowerInfo()
+ .getLeaderInfo().getId().getId();
+ // There may be a chance, here we get leaderNodeId as null. For
+ // example, in 3 node OM Ratis, if 2 SCM nodes are down, there will
+ // be no leader.
+ RaftPeerId leaderPeerId = null;
+ if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
+ leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
+ }
+
+ setServerRole(thisNodeRole, leaderPeerId);
+
+ } else {
+ setServerRole(thisNodeRole, null);
+
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to " +
+ "{} and resetting leader info.",
+ RaftProtos.RaftPeerRole.UNRECOGNIZED, e);
+ setServerRole(null, null);
+ }
+ }
+
+ private GroupInfoReply getGroupInfo() throws IOException {
+ GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
+ raftPeerId, raftGroupId, nextCallId());
+ GroupInfoReply groupInfo = server.getGroupInfo(groupInfoRequest);
+ return groupInfo;
+ }
+
+ private void setServerRole(RaftProtos.RaftPeerRole currentRole,
+ RaftPeerId leaderPeerId) {
+ this.roleCheckLock.writeLock().lock();
+ try {
+ this.cachedPeerRole = Optional.ofNullable(currentRole);
+ this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId);
+ } finally {
+ this.roleCheckLock.writeLock().unlock();
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java
new file mode 100644
index 000000000000..502260a9c8f6
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ratis;
+
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+
+/**
+ * Class for SCM StateMachine.
+ */
+public class SCMStateMachine extends BaseStateMachine {
+ //TODO to be implemented
+ public SCMStateMachine(SCMRatisServer ratisServer) {
+
+ }
+
+ public void stop() {
+ return;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 9dcb8f2bc060..884710cab19c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -29,6 +29,7 @@
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
+import java.util.Collections;
import java.util.Objects;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,9 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.ratis.SCMRatisServer;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -190,6 +194,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private CertificateServer certificateServer;
private GrpcTlsConfig grpcTlsConfig;
+ // SCM HA related
+ private SCMRatisServer scmRatisServer;
+
private JvmPauseMonitor jvmPauseMonitor;
private final OzoneConfiguration configuration;
private final SafeModeHandler safeModeHandler;
@@ -258,6 +265,12 @@ public StorageContainerManager(OzoneConfiguration conf,
loginAsSCMUser(conf);
}
+ if (SCMHAUtils.isSCMHAEnabled(conf)) {
+ initializeRatisServer();
+ } else {
+ scmRatisServer = null;
+ }
+
// Creates the SCM DBs or opens them if it exists.
// A valid pointer to the store is required by all the other services below.
initalizeMetadataStore(conf, configurator);
@@ -1107,4 +1120,18 @@ public SCMMetadataStore getScmMetadataStore() {
public NetworkTopology getClusterMap() {
return this.clusterMap;
}
+
+ private void initializeRatisServer() throws IOException {
+ if (scmRatisServer == null) {
+ SCMNodeDetails scmNodeDetails = SCMNodeDetails
+ .initStandAlone(configuration);
+ //TODO enable Ratis ring
+ scmRatisServer = SCMRatisServer.newSCMRatisServer(configuration, this,
+ scmNodeDetails, Collections.EMPTY_LIST);
+ if (scmRatisServer != null) {
+ LOG.info("SCM Ratis server initialized at port {}",
+ scmRatisServer.getServerPort());
+ }
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java
new file mode 100644
index 000000000000..f29fb5fed35b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ratis;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.util.LifeCycle;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+
+/**
+ * Test class for SCM Ratis Server.
+ */
+public class TestSCMRatisServer {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private OzoneConfiguration conf;
+ private SCMRatisServer scmRatisServer;
+ private StorageContainerManager scm;
+ private String scmId;
+ private SCMNodeDetails scmNodeDetails;
+ private static final long LEADER_ELECTION_TIMEOUT = 500L;
+
+ @Before
+ public void init() throws Exception {
+ conf = new OzoneConfiguration();
+ scmId = UUID.randomUUID().toString();
+ conf.setTimeDuration(
+ ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ int ratisPort = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY,
+ ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT);
+ InetSocketAddress rpcAddress = new InetSocketAddress(
+ InetAddress.getLocalHost(), 0);
+ scmNodeDetails = new SCMNodeDetails.Builder()
+ .setRatisPort(ratisPort)
+ .setRpcAddress(rpcAddress)
+ .setSCMNodeId(scmId)
+ .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT)
+ .build();
+
+ // Standalone SCM Ratis server
+ initSCM();
+ scm = HddsTestUtils.getScm(conf);
+ scm.start();
+ scmRatisServer = SCMRatisServer.newSCMRatisServer(
+ conf, scm, scmNodeDetails, Collections.EMPTY_LIST);
+ scmRatisServer.start();
+ }
+
+ @After
+ public void shutdown() {
+ if (scmRatisServer != null) {
+ scmRatisServer.stop();
+ }
+ if (scm != null) {
+ scm.stop();
+ }
+ }
+
+ @Test
+ public void testStartSCMRatisServer() throws Exception {
+ Assert.assertEquals("Ratis Server should be in running state",
+ LifeCycle.State.RUNNING, scmRatisServer.getServerState());
+ }
+
+ @Test
+ public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws
+ Exception {
+ String customScmServiceId = "scmIdCustom123";
+ OzoneConfiguration newConf = new OzoneConfiguration();
+ String newOmId = UUID.randomUUID().toString();
+ String path = GenericTestUtils.getTempPath(newOmId);
+ Path metaDirPath = Paths.get(path, "scm-meta");
+ newConf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
+ newConf.setTimeDuration(
+ ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ int ratisPort = 9873;
+ InetSocketAddress rpcAddress = new InetSocketAddress(
+ InetAddress.getLocalHost(), 0);
+ SCMNodeDetails nodeDetails = new SCMNodeDetails.Builder()
+ .setRpcAddress(rpcAddress)
+ .setRatisPort(ratisPort)
+ .setSCMNodeId(newOmId)
+ .setSCMServiceId(customScmServiceId)
+ .build();
+ // Starts a single node Ratis server
+ scmRatisServer.stop();
+ SCMRatisServer newScmRatisServer = SCMRatisServer
+ .newSCMRatisServer(newConf, scm, nodeDetails,
+ Collections.emptyList());
+ newScmRatisServer.start();
+
+ UUID uuid = UUID.nameUUIDFromBytes(customScmServiceId.getBytes());
+ RaftGroupId raftGroupId = newScmRatisServer.getRaftGroup().getGroupId();
+ Assert.assertEquals(uuid, raftGroupId.getUuid());
+ Assert.assertEquals(raftGroupId.toByteString().size(), 16);
+ newScmRatisServer.stop();
+ }
+
+ private void initSCM() throws IOException {
+ String clusterId = UUID.randomUUID().toString();
+ scmId = UUID.randomUUID().toString();
+
+ final String path = folder.newFolder().toString();
+ Path scmPath = Paths.get(path, "scm-meta");
+ Files.createDirectories(scmPath);
+ conf.set(OZONE_METADATA_DIRS, scmPath.toString());
+ SCMStorageConfig scmStore = new SCMStorageConfig(conf);
+ scmStore.setClusterId(clusterId);
+ scmStore.setScmId(scmId);
+ // writes the version file properties
+ scmStore.initialize();
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 49f7f8db14f7..b3a9600bd58a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -44,6 +44,7 @@ public void initializeMemberVariables() {
errorIfMissingXmlProps = true;
xmlPropsToSkipCompare.add("hadoop.tags.custom");
xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID");
+ xmlPropsToSkipCompare.add("ozone.scm.nodes.EXAMPLESCMSERVICEID");
addPropertiesNotInXml();
}
@@ -55,6 +56,7 @@ private void addPropertiesNotInXml() {
HddsConfigKeys.HDDS_SECURITY_PROVIDER,
HddsConfigKeys.HDDS_X509_CRL_NAME, // HDDS-2873
OMConfigKeys.OZONE_OM_NODES_KEY,
+ ScmConfigKeys.OZONE_SCM_NODES_KEY,
OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE,
OzoneConfigKeys.OZONE_S3_AUTHINFO_MAX_LIFETIME_KEY,
ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR