diff --git a/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml
new file mode 100644
index 000000000000..3571a8929e3f
--- /dev/null
+++ b/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml
@@ -0,0 +1,21 @@
+
+
+
+
+
+
diff --git a/hadoop-hdds/server-scm/pom.xml b/hadoop-hdds/server-scm/pom.xml
index dcbc42a17d52..8c17aaef4566 100644
--- a/hadoop-hdds/server-scm/pom.xml
+++ b/hadoop-hdds/server-scm/pom.xml
@@ -128,6 +128,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
hadoop-hdds-hadoop-dependency-testtest
+
+ com.google.protobuf
+ protobuf-java
+ compile
+
@@ -163,6 +168,39 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ org.apache.hadoop
+ hadoop-maven-plugins
+
+
+ compile-protoc
+
+ protoc
+
+
+ ${protobuf.version}
+
+
+ ${basedir}/src/main/proto
+
+
+
+ ${basedir}/src/main/proto
+
+ SCMRatisProtocol.proto
+
+
+
+
+
+
+
+ com.github.spotbugs
+ spotbugs-maven-plugin
+
+ ${basedir}/dev-support/findbugsExcludeFile.xml
+
+
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
new file mode 100644
index 000000000000..0404530b2f1f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: Add javadoc.
+ */
+public class ContainerManagerImpl implements ContainerManagerV2 {
+
+ /**
+ *
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ContainerManagerImpl.class);
+
+ /**
+ *
+ */
+ private final ReadWriteLock lock;
+
+ /**
+ *
+ */
+ private final PipelineManager pipelineManager;
+
+ /**
+ *
+ */
+ private final ContainerStateManagerV2 containerStateManager;
+
+ /**
+ *
+ */
+ public ContainerManagerImpl(
+ // Introduce builder for this class?
+ final Configuration conf, final PipelineManager pipelineManager,
+ final SCMHAManager scmhaManager,
+ final Table containerStore)
+ throws IOException {
+ this.lock = new ReentrantReadWriteLock();
+ this.pipelineManager = pipelineManager;
+ this.containerStateManager = ContainerStateManagerImpl.newBuilder()
+ .setConfiguration(conf)
+ .setPipelineManager(pipelineManager)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setContainerStore(containerStore)
+ .build();
+ }
+
+ @Override
+ public Set getContainerIDs() {
+ lock.readLock().lock();
+ try {
+ return containerStateManager.getContainerIDs();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public Set getContainers() {
+ lock.readLock().lock();
+ try {
+ return containerStateManager.getContainerIDs().stream().map(id -> {
+ try {
+ return containerStateManager.getContainer(id);
+ } catch (ContainerNotFoundException e) {
+ // How can this happen? o_O
+ return null;
+ }
+ }).filter(Objects::nonNull).collect(Collectors.toSet());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public ContainerInfo getContainer(final ContainerID containerID)
+ throws ContainerNotFoundException {
+ lock.readLock().lock();
+ try {
+ return containerStateManager.getContainer(containerID);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public Set getContainers(final LifeCycleState state) {
+ lock.readLock().lock();
+ try {
+ return containerStateManager.getContainerIDs(state).stream().map(id -> {
+ try {
+ return containerStateManager.getContainer(id);
+ } catch (ContainerNotFoundException e) {
+ // How can this happen? o_O
+ return null;
+ }
+ }).filter(Objects::nonNull).collect(Collectors.toSet());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean exists(final ContainerID containerID) {
+ lock.readLock().lock();
+ try {
+ return (containerStateManager.getContainer(containerID) != null);
+ } catch (ContainerNotFoundException ex) {
+ return false;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List listContainers(final ContainerID startID,
+ final int count) {
+ lock.readLock().lock();
+ try {
+ final long startId = startID == null ? 0 : startID.getId();
+ final List containersIds =
+ new ArrayList<>(containerStateManager.getContainerIDs());
+ Collections.sort(containersIds);
+ return containersIds.stream()
+ .filter(id -> id.getId() > startId)
+ .limit(count)
+ .map(id -> {
+ try {
+ return containerStateManager.getContainer(id);
+ } catch (ContainerNotFoundException ex) {
+ // This can never happen, as we hold lock no one else can remove
+ // the container after we got the container ids.
+ LOG.warn("Container Missing.", ex);
+ return null;
+ }
+ }).collect(Collectors.toList());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public ContainerInfo allocateContainer(final ReplicationType type,
+ final ReplicationFactor replicationFactor, final String owner)
+ throws IOException {
+ lock.writeLock().lock();
+ try {
+ final List pipelines = pipelineManager
+ .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
+
+ if (pipelines.isEmpty()) {
+ throw new IOException("Could not allocate container. Cannot get any" +
+ " matching pipeline for Type:" + type + ", Factor:" +
+ replicationFactor + ", State:PipelineState.OPEN");
+ }
+
+ final ContainerID containerID = containerStateManager
+ .getNextContainerID();
+ final Pipeline pipeline = pipelines.get(
+ (int) containerID.getId() % pipelines.size());
+
+ final ContainerInfoProto containerInfo = ContainerInfoProto.newBuilder()
+ .setState(LifeCycleState.OPEN)
+ .setPipelineID(pipeline.getId().getProtobuf())
+ .setUsedBytes(0)
+ .setNumberOfKeys(0)
+ .setStateEnterTime(Time.now())
+ .setOwner(owner)
+ .setContainerID(containerID.getId())
+ .setDeleteTransactionId(0)
+ .setReplicationFactor(pipeline.getFactor())
+ .setReplicationType(pipeline.getType())
+ .build();
+ containerStateManager.addContainer(containerInfo);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("New container allocated: {}", containerInfo);
+ }
+ return containerStateManager.getContainer(containerID);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void deleteContainer(final ContainerID containerID)
+ throws ContainerNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void updateContainerState(final ContainerID containerID,
+ final LifeCycleEvent event)
+ throws ContainerNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public Set getContainerReplicas(
+ final ContainerID containerID) throws ContainerNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void updateContainerReplica(final ContainerID containerID,
+ final ContainerReplica replica)
+ throws ContainerNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void removeContainerReplica(final ContainerID containerID,
+ final ContainerReplica replica)
+ throws ContainerNotFoundException, ContainerReplicaNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void updateDeleteTransactionId(
+ final Map deleteTransactionMap) throws IOException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public ContainerInfo getMatchingContainer(final long size, final String owner,
+ final Pipeline pipeline, final List excludedContainerIDS) {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void notifyContainerReportProcessing(final boolean isFullReport,
+ final boolean success) {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
new file mode 100644
index 000000000000..37c7b709d458
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+
+/**
+ * TODO: Add extensive javadoc.
+ *
+ * ContainerManager class contains the mapping from a name to a pipeline
+ * mapping. This is used by SCM when allocating new locations and when
+ * looking up a key.
+ */
+public interface ContainerManagerV2 extends Closeable {
+
+
+ /**
+ * Returns all the container Ids managed by ContainerManager.
+ *
+ * @return Set of ContainerID
+ */
+ Set getContainerIDs();
+
+ /**
+ * Returns all the containers managed by ContainerManager.
+ *
+ * @return List of ContainerInfo
+ */
+ Set getContainers();
+
+ /**
+ * Returns all the containers which are in the specified state.
+ *
+ * @return List of ContainerInfo
+ */
+ Set getContainers(LifeCycleState state);
+
+ /**
+ * Returns the ContainerInfo from the container ID.
+ *
+ */
+ ContainerInfo getContainer(ContainerID containerID)
+ throws ContainerNotFoundException;
+
+ boolean exists(ContainerID containerID);
+
+ /**
+ * Returns containers under certain conditions.
+ * Search container IDs from start ID(exclusive),
+ * The max size of the searching range cannot exceed the
+ * value of count.
+ *
+ * @param startID start containerID, >=0,
+ * start searching at the head if 0.
+ * @param count count must be >= 0
+ * Usually the count will be replace with a very big
+ * value instead of being unlimited in case the db is very big.
+ *
+ * @return a list of container.
+ */
+ List listContainers(ContainerID startID, int count);
+
+ /**
+ * Allocates a new container for a given keyName and replication factor.
+ *
+ * @param replicationFactor - replication factor of the container.
+ * @param owner
+ * @return - ContainerInfo.
+ * @throws IOException
+ */
+ ContainerInfo allocateContainer(ReplicationType type,
+ ReplicationFactor replicationFactor,
+ String owner) throws IOException;
+
+ /**
+ * Deletes a container from SCM.
+ *
+ * @param containerID - Container ID
+ * @throws IOException
+ */
+ void deleteContainer(ContainerID containerID)
+ throws ContainerNotFoundException;
+
+ /**
+ * Update container state.
+ * @param containerID - Container ID
+ * @param event - container life cycle event
+ * @throws IOException
+ */
+ void updateContainerState(ContainerID containerID,
+ LifeCycleEvent event)
+ throws ContainerNotFoundException;
+
+ /**
+ * Returns the latest list of replicas for given containerId.
+ *
+ * @param containerID Container ID
+ * @return Set of ContainerReplica
+ */
+ Set getContainerReplicas(ContainerID containerID)
+ throws ContainerNotFoundException;
+
+ /**
+ * Adds a container Replica for the given Container.
+ *
+ * @param containerID Container ID
+ * @param replica ContainerReplica
+ */
+ void updateContainerReplica(ContainerID containerID, ContainerReplica replica)
+ throws ContainerNotFoundException;
+
+ /**
+ * Remove a container Replica form a given Container.
+ *
+ * @param containerID Container ID
+ * @param replica ContainerReplica
+ * @return True of dataNode is removed successfully else false.
+ */
+ void removeContainerReplica(ContainerID containerID, ContainerReplica replica)
+ throws ContainerNotFoundException, ContainerReplicaNotFoundException;
+
+ /**
+ * Update deleteTransactionId according to deleteTransactionMap.
+ *
+ * @param deleteTransactionMap Maps the containerId to latest delete
+ * transaction id for the container.
+ * @throws IOException
+ */
+ void updateDeleteTransactionId(Map deleteTransactionMap)
+ throws IOException;
+
+ /**
+ * Returns ContainerInfo which matches the requirements.
+ * @param size - the amount of space required in the container
+ * @param owner - the user which requires space in its owned container
+ * @param pipeline - pipeline to which the container should belong
+ * @return ContainerInfo for the matching container.
+ */
+ default ContainerInfo getMatchingContainer(long size, String owner,
+ Pipeline pipeline) {
+ return getMatchingContainer(size, owner, pipeline, Collections.emptyList());
+ }
+
+ /**
+ * Returns ContainerInfo which matches the requirements.
+ * @param size - the amount of space required in the container
+ * @param owner - the user which requires space in its owned container
+ * @param pipeline - pipeline to which the container should belong.
+ * @param excludedContainerIDS - containerIds to be excluded.
+ * @return ContainerInfo for the matching container.
+ */
+ ContainerInfo getMatchingContainer(long size, String owner,
+ Pipeline pipeline,
+ List excludedContainerIDS);
+
+ /**
+ * Once after report processor handler completes, call this to notify
+ * container manager to increment metrics.
+ * @param isFullReport
+ * @param success
+ */
+ // Is it possible to remove this from the Interface?
+ void notifyContainerReportProcessing(boolean isFullReport, boolean success);
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
new file mode 100644
index 000000000000..16fe3407bde4
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+
+import java.lang.reflect.Proxy;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.states.ContainerState;
+import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+
+/**
+ * TODO: Add javadoc.
+ */
+public final class ContainerStateManagerImpl
+ implements ContainerStateManagerV2 {
+
+ /* **********************************************************************
+ * Container Life Cycle *
+ * *
+ * Event and State Transition Mapping: *
+ * *
+ * State: OPEN ----------------> CLOSING *
+ * Event: FINALIZE *
+ * *
+ * State: CLOSING ----------------> QUASI_CLOSED *
+ * Event: QUASI_CLOSE *
+ * *
+ * State: CLOSING ----------------> CLOSED *
+ * Event: CLOSE *
+ * *
+ * State: QUASI_CLOSED ----------------> CLOSED *
+ * Event: FORCE_CLOSE *
+ * *
+ * State: CLOSED ----------------> DELETING *
+ * Event: DELETE *
+ * *
+ * State: DELETING ----------------> DELETED *
+ * Event: CLEANUP *
+ * *
+ * *
+ * Container State Flow: *
+ * *
+ * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] *
+ * (FINALIZE) | (QUASI_CLOSE) | *
+ * | | *
+ * | | *
+ * (CLOSE) | (FORCE_CLOSE) | *
+ * | | *
+ * | | *
+ * +--------->[CLOSED]<--------+ *
+ * | *
+ * (DELETE)| *
+ * | *
+ * | *
+ * [DELETING] *
+ * | *
+ * (CLEANUP) | *
+ * | *
+ * V *
+ * [DELETED] *
+ * *
+ ************************************************************************/
+
+ /**
+ *
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ContainerStateManagerImpl.class);
+
+ /**
+ *
+ */
+ private final long containerSize;
+
+ /**
+ *
+ */
+ private final AtomicLong nextContainerID;
+
+ /**
+ *
+ */
+ private final ContainerStateMap containers;
+
+ /**
+ *
+ */
+ private final PipelineManager pipelineManager;
+
+ /**
+ *
+ */
+ private Table containerStore;
+
+ /**
+ *
+ */
+ private final StateMachine stateMachine;
+
+ /**
+ *
+ */
+ private final ConcurrentHashMap lastUsedMap;
+
+ /**
+ *
+ */
+ private ContainerStateManagerImpl(final Configuration conf,
+ final PipelineManager pipelineManager,
+ final Table containerStore)
+ throws IOException {
+ this.pipelineManager = pipelineManager;
+ this.containerStore = containerStore;
+ this.stateMachine = newStateMachine();
+ this.containerSize = getConfiguredContainerSize(conf);
+ this.nextContainerID = new AtomicLong();
+ this.containers = new ContainerStateMap();
+ this.lastUsedMap = new ConcurrentHashMap<>();
+
+ initialize();
+ }
+
+ /**
+ *
+ */
+ private StateMachine newStateMachine() {
+
+ final Set finalStates = new HashSet<>();
+
+ // These are the steady states of a container.
+ finalStates.add(LifeCycleState.OPEN);
+ finalStates.add(LifeCycleState.CLOSED);
+ finalStates.add(LifeCycleState.DELETED);
+
+ final StateMachine containerLifecycleSM =
+ new StateMachine<>(LifeCycleState.OPEN, finalStates);
+
+ containerLifecycleSM.addTransition(LifeCycleState.OPEN,
+ LifeCycleState.CLOSING,
+ LifeCycleEvent.FINALIZE);
+
+ containerLifecycleSM.addTransition(LifeCycleState.CLOSING,
+ LifeCycleState.QUASI_CLOSED,
+ LifeCycleEvent.QUASI_CLOSE);
+
+ containerLifecycleSM.addTransition(LifeCycleState.CLOSING,
+ LifeCycleState.CLOSED,
+ LifeCycleEvent.CLOSE);
+
+ containerLifecycleSM.addTransition(LifeCycleState.QUASI_CLOSED,
+ LifeCycleState.CLOSED,
+ LifeCycleEvent.FORCE_CLOSE);
+
+ containerLifecycleSM.addTransition(LifeCycleState.CLOSED,
+ LifeCycleState.DELETING,
+ LifeCycleEvent.DELETE);
+
+ containerLifecycleSM.addTransition(LifeCycleState.DELETING,
+ LifeCycleState.DELETED,
+ LifeCycleEvent.CLEANUP);
+
+ return containerLifecycleSM;
+ }
+
+ /**
+ *
+ */
+ private long getConfiguredContainerSize(final Configuration conf) {
+ return (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ }
+
+ /**
+ *
+ */
+ private void initialize() throws IOException {
+ TableIterator>
+ iterator = containerStore.iterator();
+
+ while (iterator.hasNext()) {
+ final ContainerInfo container = iterator.next().getValue();
+ Preconditions.checkNotNull(container);
+ containers.addContainer(container);
+ nextContainerID.set(Long.max(container.containerID().getId(),
+ nextContainerID.get()));
+ if (container.getState() == LifeCycleState.OPEN) {
+ try {
+ pipelineManager.addContainerToPipeline(container.getPipelineID(),
+ ContainerID.valueof(container.getContainerID()));
+ } catch (PipelineNotFoundException ex) {
+ LOG.warn("Found container {} which is in OPEN state with " +
+ "pipeline {} that does not exist. Marking container for " +
+ "closing.", container, container.getPipelineID());
+ updateContainerState(container.containerID(),
+ LifeCycleEvent.FINALIZE);
+ }
+ }
+ }
+ }
+
+ @Override
+ public ContainerID getNextContainerID() {
+ return ContainerID.valueof(nextContainerID.get());
+ }
+
+ @Override
+ public Set getContainerIDs() {
+ return containers.getAllContainerIDs();
+ }
+
+ @Override
+ public Set getContainerIDs(final LifeCycleState state) {
+ return containers.getContainerIDsByState(state);
+ }
+
+ @Override
+ public ContainerInfo getContainer(final ContainerID containerID)
+ throws ContainerNotFoundException {
+ return containers.getContainerInfo(containerID);
+ }
+
+ @Override
+ public Set getContainerReplicas(
+ final ContainerID containerID) throws ContainerNotFoundException {
+ return containers.getContainerReplicas(containerID);
+ }
+
+ @Override
+ public void addContainer(final ContainerInfoProto containerInfo)
+ throws IOException {
+
+ // Change the exception thrown to PipelineNotFound and
+ // ClosedPipelineException once ClosedPipelineException is introduced
+ // in PipelineManager.
+
+ Preconditions.checkNotNull(containerInfo);
+ final ContainerInfo container = ContainerInfo.fromProtobuf(containerInfo);
+ if (getContainer(container.containerID()) == null) {
+ Preconditions.checkArgument(nextContainerID.get()
+ == container.containerID().getId(),
+ "ContainerID mismatch.");
+
+ pipelineManager.addContainerToPipeline(
+ container.getPipelineID(), container.containerID());
+ containers.addContainer(container);
+ nextContainerID.incrementAndGet();
+ }
+ }
+
+ void updateContainerState(final ContainerID containerID,
+ final LifeCycleEvent event)
+ throws IOException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+
+ void updateContainerReplica(final ContainerID containerID,
+ final ContainerReplica replica)
+ throws ContainerNotFoundException {
+ containers.updateContainerReplica(containerID, replica);
+ }
+
+
+ void updateDeleteTransactionId(
+ final Map deleteTransactionMap) {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ ContainerInfo getMatchingContainer(final long size, String owner,
+ PipelineID pipelineID, NavigableSet containerIDs) {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+
+ NavigableSet getMatchingContainerIDs(final String owner,
+ final ReplicationType type, final ReplicationFactor factor,
+ final LifeCycleState state) {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ void removeContainerReplica(final ContainerID containerID,
+ final ContainerReplica replica)
+ throws ContainerNotFoundException, ContainerReplicaNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+
+ void removeContainer(final ContainerID containerID)
+ throws ContainerNotFoundException {
+ throw new UnsupportedOperationException("Not yet implemented!");
+ }
+
+ void close() throws IOException {
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for ContainerStateManager.
+ */
+ public static class Builder {
+ private Configuration conf;
+ private PipelineManager pipelineMgr;
+ private SCMRatisServer scmRatisServer;
+ private Table table;
+
+ public Builder setConfiguration(final Configuration config) {
+ conf = config;
+ return this;
+ }
+
+ public Builder setPipelineManager(final PipelineManager pipelineManager) {
+ pipelineMgr = pipelineManager;
+ return this;
+ }
+
+ public Builder setRatisServer(final SCMRatisServer ratisServer) {
+ scmRatisServer = ratisServer;
+ return this;
+ }
+
+ public Builder setContainerStore(
+ final Table containerStore) {
+ table = containerStore;
+ return this;
+ }
+
+ public ContainerStateManagerV2 build() throws IOException {
+ Preconditions.checkNotNull(conf);
+ Preconditions.checkNotNull(pipelineMgr);
+ Preconditions.checkNotNull(scmRatisServer);
+ Preconditions.checkNotNull(table);
+
+ final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
+ conf, pipelineMgr, table);
+ scmRatisServer.registerStateMachineHandler(RequestType.CONTAINER, csm);
+
+ final SCMHAInvocationHandler invocationHandler =
+ new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
+ scmRatisServer);
+
+ return (ContainerStateManagerV2) Proxy.newProxyInstance(
+ SCMHAInvocationHandler.class.getClassLoader(),
+ new Class>[]{ContainerStateManagerV2.class}, invocationHandler);
+ }
+
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
new file mode 100644
index 000000000000..9960354be402
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ *
+ * TODO: Add proper javadoc.
+ *
+ * Implementation of methods marked with {@code @Replicate} annotation should be
+ *
+ * 1. Idempotent
+ * 2. Arguments should be of protobuf objects
+ * 3. Return type should be of protobuf object
+ * 4. The declaration should throw RaftException
+ *
+ */
+public interface ContainerStateManagerV2 {
+
+ /**
+ *
+ */
+ ContainerID getNextContainerID();
+
+ /**
+ *
+ */
+ Set getContainerIDs();
+
+ /**
+ *
+ */
+ Set getContainerIDs(LifeCycleState state);
+
+ /**
+ *
+ */
+ ContainerInfo getContainer(ContainerID containerID)
+ throws ContainerNotFoundException;
+
+ /**
+ *
+ */
+ Set getContainerReplicas(ContainerID containerID)
+ throws ContainerNotFoundException;
+
+ /**
+ *
+ */
+ @Replicate
+ void addContainer(ContainerInfoProto containerInfo)
+ throws IOException;
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
new file mode 100644
index 000000000000..1bc16974362f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.Log;
+import static org.apache.ratis.server.RaftServerConfigKeys.RetryCache;
+import static org.apache.ratis.server.RaftServerConfigKeys.Rpc;
+import static org.apache.ratis.server.RaftServerConfigKeys.Snapshot;
+
+/**
+ * Ratis Util for SCM HA.
+ */
+public final class RatisUtil {
+
+ private RatisUtil() {
+ }
+
+
+ /**
+ * Constructs new Raft Properties instance using {@link SCMHAConfiguration}.
+ * @param haConf SCMHAConfiguration
+ * @param conf ConfigurationSource
+ */
+ public static RaftProperties newRaftProperties(
+ final SCMHAConfiguration haConf, final ConfigurationSource conf) {
+ //TODO: Remove ConfigurationSource!
+ // TODO: Check the default values.
+ final RaftProperties properties = new RaftProperties();
+ setRaftStorageDir(properties, haConf, conf);
+ setRaftRpcProperties(properties, haConf);
+ setRaftLogProperties(properties, haConf);
+ setRaftRetryCacheProperties(properties, haConf);
+ setRaftSnapshotProperties(properties, haConf);
+ return properties;
+ }
+
+ /**
+ * Set the local directory where ratis logs will be stored.
+ *
+ * @param properties RaftProperties instance which will be updated
+ * @param haConf SCMHAConfiguration
+ * @param conf ConfigurationSource
+ */
+ public static void setRaftStorageDir(final RaftProperties properties,
+ final SCMHAConfiguration haConf,
+ final ConfigurationSource conf) {
+ String storageDir = haConf.getRatisStorageDir();
+ if (Strings.isNullOrEmpty(storageDir)) {
+ storageDir = ServerUtils.getDefaultRatisDirectory(conf);
+ }
+ RaftServerConfigKeys.setStorageDir(properties,
+ Collections.singletonList(new File(storageDir)));
+ }
+
+ /**
+ * Set properties related to Raft RPC.
+ *
+ * @param properties RaftProperties instance which will be updated
+ * @param conf SCMHAConfiguration
+ */
+ private static void setRaftRpcProperties(final RaftProperties properties,
+ final SCMHAConfiguration conf) {
+ RaftConfigKeys.Rpc.setType(properties,
+ RpcType.valueOf(conf.getRatisRpcType()));
+ GrpcConfigKeys.Server.setPort(properties,
+ conf.getRatisBindAddress().getPort());
+ GrpcConfigKeys.setMessageSizeMax(properties,
+ SizeInBytes.valueOf("32m"));
+
+ Rpc.setRequestTimeout(properties, TimeDuration.valueOf(
+ conf.getRatisRequestTimeout(), TimeUnit.MILLISECONDS));
+ Rpc.setTimeoutMin(properties, TimeDuration.valueOf(
+ conf.getRatisRequestMinTimeout(), TimeUnit.MILLISECONDS));
+ Rpc.setTimeoutMax(properties, TimeDuration.valueOf(
+ conf.getRatisRequestMaxTimeout(), TimeUnit.MILLISECONDS));
+ Rpc.setSlownessTimeout(properties, TimeDuration.valueOf(
+ conf.getRatisNodeFailureTimeout(), TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Set properties related to Raft Log.
+ *
+ * @param properties RaftProperties instance which will be updated
+ * @param conf SCMHAConfiguration
+ */
+ private static void setRaftLogProperties(final RaftProperties properties,
+ final SCMHAConfiguration conf) {
+ Log.setSegmentSizeMax(properties,
+ SizeInBytes.valueOf(conf.getRaftSegmentSize()));
+ Log.Appender.setBufferElementLimit(properties,
+ conf.getRaftLogAppenderQueueByteLimit());
+ Log.Appender.setBufferByteLimit(properties,
+ SizeInBytes.valueOf(conf.getRaftLogAppenderQueueByteLimit()));
+ Log.setPreallocatedSize(properties,
+ SizeInBytes.valueOf(conf.getRaftSegmentPreAllocatedSize()));
+ Log.Appender.setInstallSnapshotEnabled(properties, false);
+ Log.setPurgeGap(properties, conf.getRaftLogPurgeGap());
+ Log.setSegmentCacheNumMax(properties, 2);
+ }
+
+ /**
+ * Set properties related to Raft Retry Cache.
+ *
+ * @param properties RaftProperties instance which will be updated
+ * @param conf SCMHAConfiguration
+ */
+ private static void setRaftRetryCacheProperties(
+ final RaftProperties properties, final SCMHAConfiguration conf) {
+ RetryCache.setExpiryTime(properties, TimeDuration.valueOf(
+ conf.getRatisRetryCacheTimeout(), TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * Set properties related to Raft Snapshot.
+ *
+ * @param properties RaftProperties instance which will be updated
+ * @param conf SCMHAConfiguration
+ */
+ private static void setRaftSnapshotProperties(
+ final RaftProperties properties, final SCMHAConfiguration conf) {
+ Snapshot.setAutoTriggerEnabled(properties, true);
+ Snapshot.setAutoTriggerThreshold(properties, 400000);
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java
new file mode 100644
index 000000000000..7c54723d7470
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Reflection util for SCM HA.
+ */
+public final class ReflectionUtil {
+
+ private static Map> classCache = new HashMap<>();
+
+ private ReflectionUtil() {
+ }
+
+ /**
+ * Returns the {@code Class} object associated with the given string name.
+ *
+ * @param className the fully qualified name of the desired class.
+ * @return the {@code Class} object for the class with the
+ * specified name.
+ * @throws ClassNotFoundException if the class cannot be located
+ */
+ public static Class> getClass(String className)
+ throws ClassNotFoundException {
+ if (!classCache.containsKey(className)) {
+ classCache.put(className, Class.forName(className));
+ }
+ return classCache.get(className);
+ }
+
+ /**
+ * Returns a {@code Method} object that reflects the specified public
+ * member method of the given {@code Class} object.
+ *
+ * @param clazz the class object which has the method
+ * @param methodName the name of the method
+ * @param arg the list of parameters
+ * @return the {@code Method} object that matches the specified
+ * {@code name} and {@code parameterTypes}
+ * @throws NoSuchMethodException if a matching method is not found
+ * or if the name is "<init>"or "<clinit>".
+ */
+ public static Method getMethod(
+ final Class> clazz, final String methodName, final Class>... arg)
+ throws NoSuchMethodException {
+ return clazz.getMethod(methodName, arg);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
new file mode 100644
index 000000000000..1cb8c65675f8
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.net.NetUtils;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.HA;
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.RATIS;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+
+/**
+ * Configuration used by SCM HA.
+ */
+@ConfigGroup(prefix = "ozone.scm.ha")
+public class SCMHAConfiguration {
+
+ @Config(key = "ratis.storage.dir",
+ type = ConfigType.STRING,
+ defaultValue = "",
+ tags = {OZONE, SCM, HA, RATIS},
+ description = "Storage directory used by SCM to write Ratis logs."
+ )
+ private String ratisStorageDir;
+
+ @Config(key = "ratis.bind.host",
+ type = ConfigType.STRING,
+ defaultValue = "0.0.0.0",
+ tags = {OZONE, SCM, HA, RATIS},
+ description = "Host used by SCM for binding Ratis Server."
+ )
+ private String ratisBindHost = "0.0.0.0";
+
+ @Config(key = "ratis.bind.port",
+ type = ConfigType.STRING,
+ defaultValue = "9865",
+ tags = {OZONE, SCM, HA, RATIS},
+ description = "Port used by SCM for Ratis Server."
+ )
+ private int ratisBindPort = 9865;
+
+
+ @Config(key = "ratis.rpc.type",
+ type = ConfigType.STRING,
+ defaultValue = "GRPC",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Ratis supports different kinds of transports like" +
+ " netty, GRPC, Hadoop RPC etc. This picks one of those for" +
+ " this cluster."
+ )
+ private String ratisRpcType;
+
+ @Config(key = "ratis.segment.size",
+ type = ConfigType.SIZE,
+ defaultValue = "16KB",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The size of the raft segment used by Apache Ratis on" +
+ " SCM. (16 KB by default)"
+ )
+ private long raftSegmentSize = 16L * 1024L;
+
+ @Config(key = "ratis.segment.preallocated.size",
+ type = ConfigType.SIZE,
+ defaultValue = "16KB",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The size of the buffer which is preallocated for" +
+ " raft segment used by Apache Ratis on SCM.(16 KB by default)"
+ )
+ private long raftSegmentPreAllocatedSize = 16 * 1024;
+
+ @Config(key = "ratis.log.appender.queue.num-elements",
+ type = ConfigType.INT,
+ defaultValue = "1024",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Number of operation pending with Raft's Log Worker."
+ )
+ private int raftLogAppenderQueueNum = 1024;
+
+ @Config(key = "ratis.log.appender.queue.byte-limit",
+ type = ConfigType.SIZE,
+ defaultValue = "32MB",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Byte limit for Raft's Log Worker queue."
+ )
+ private int raftLogAppenderQueueByteLimit = 32 * 1024 * 1024;
+
+ @Config(key = "ratis.log.purge.gap",
+ type = ConfigType.INT,
+ defaultValue = "1000000",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The minimum gap between log indices for Raft server to" +
+ " purge its log segments after taking snapshot."
+ )
+ private int raftLogPurgeGap = 1000000;
+
+ @Config(key = "ratis.request.timeout",
+ type = ConfigType.TIME,
+ defaultValue = "3000ms",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The timeout duration for SCM's Ratis server RPC."
+ )
+ private long ratisRequestTimeout = 3000L;
+
+ @Config(key = "ratis.server.retry.cache.timeout",
+ type = ConfigType.TIME,
+ defaultValue = "60s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Retry Cache entry timeout for SCM's ratis server."
+ )
+ private long ratisRetryCacheTimeout = 60 * 1000L;
+
+
+ @Config(key = "ratis.leader.election.timeout",
+ type = ConfigType.TIME,
+ defaultValue = "1s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The minimum timeout duration for SCM ratis leader" +
+ " election. Default is 1s."
+ )
+ private long ratisLeaderElectionTimeout = 1 * 1000L;
+
+ @Config(key = "ratis.server.failure.timeout.duration",
+ type = ConfigType.TIME,
+ defaultValue = "120s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The timeout duration for ratis server failure" +
+ " detection, once the threshold has reached, the ratis state" +
+ " machine will be informed about the failure in the ratis ring."
+ )
+ private long ratisNodeFailureTimeout = 120 * 1000L;
+
+ @Config(key = "ratis.server.role.check.interval",
+ type = ConfigType.TIME,
+ defaultValue = "15s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The interval between SCM leader performing a role" +
+ " check on its ratis server. Ratis server informs SCM if it loses" +
+ " the leader role. The scheduled check is an secondary check to" +
+ " ensure that the leader role is updated periodically"
+ )
+ private long ratisRoleCheckerInterval = 15 * 1000L;
+
+ public String getRatisStorageDir() {
+ return ratisStorageDir;
+ }
+
+ public InetSocketAddress getRatisBindAddress() {
+ return NetUtils.createSocketAddr(ratisBindHost, ratisBindPort);
+ }
+
+ public String getRatisRpcType() {
+ return ratisRpcType;
+ }
+
+ public long getRaftSegmentSize() {
+ return raftSegmentSize;
+ }
+
+ public long getRaftSegmentPreAllocatedSize() {
+ return raftSegmentPreAllocatedSize;
+ }
+
+ public int getRaftLogAppenderQueueNum() {
+ return raftLogAppenderQueueNum;
+ }
+
+ public int getRaftLogAppenderQueueByteLimit() {
+ return raftLogAppenderQueueByteLimit;
+ }
+
+ public int getRaftLogPurgeGap() {
+ return raftLogPurgeGap;
+ }
+
+ public long getRatisRetryCacheTimeout() {
+ return ratisRetryCacheTimeout;
+ }
+
+ public long getRatisRequestTimeout() {
+ Preconditions.checkArgument(ratisRequestTimeout > 1000L,
+ "Ratis request timeout cannot be less than 1000ms.");
+ return ratisRequestTimeout;
+ }
+
+ public long getRatisRequestMinTimeout() {
+ return ratisRequestTimeout - 1000L;
+ }
+
+ public long getRatisRequestMaxTimeout() {
+ return ratisRequestTimeout + 1000L;
+ }
+
+ public long getRatisLeaderElectionTimeout() {
+ return ratisLeaderElectionTimeout;
+ }
+
+ public long getRatisNodeFailureTimeout() {
+ return ratisNodeFailureTimeout;
+ }
+
+ public long getRatisRoleCheckerInterval() {
+ return ratisRoleCheckerInterval;
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
new file mode 100644
index 000000000000..c78c6161ac9c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * InvocationHandler which checks for {@link Replicate} annotation and
+ * dispatches the request to Ratis Server.
+ */
+public class SCMHAInvocationHandler implements InvocationHandler {
+
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SCMHAInvocationHandler.class);
+
+ private final RequestType requestType;
+ private final Object localHandler;
+ private final SCMRatisServer ratisHandler;
+
+ /**
+ * TODO.
+ */
+ public SCMHAInvocationHandler(final RequestType requestType,
+ final Object localHandler,
+ final SCMRatisServer ratisHandler) {
+ this.requestType = requestType;
+ this.localHandler = localHandler;
+ this.ratisHandler = ratisHandler;
+ }
+
+ @Override
+ public Object invoke(final Object proxy, final Method method,
+ final Object[] args) throws Throwable {
+ try {
+ long startTime = Time.monotonicNow();
+ final Object result = method.isAnnotationPresent(Replicate.class) ?
+ invokeRatis(method, args) : invokeLocal(method, args);
+ LOG.debug("Call: {} took {} ms", method, Time.monotonicNow() - startTime);
+ return result;
+ } catch(InvocationTargetException iEx) {
+ throw iEx.getCause();
+ }
+ }
+
+ /**
+ * TODO.
+ */
+ private Object invokeLocal(Method method, Object[] args)
+ throws InvocationTargetException, IllegalAccessException {
+ LOG.trace("Invoking method {} on target {}", method, localHandler);
+ return method.invoke(method, args);
+ }
+
+ /**
+ * TODO.
+ */
+ private Object invokeRatis(Method method, Object[] args)
+ throws Exception {
+ LOG.trace("Invoking method {} on target {}", method, ratisHandler);
+ final SCMRatisResponse response = ratisHandler.submitRequest(
+ SCMRatisRequest.of(requestType, method.getName(), args));
+ if (response.isSuccess()) {
+ return response.getResult();
+ }
+ // Should we unwrap and throw proper exception from here?
+ throw response.getException();
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
new file mode 100644
index 000000000000..b38fc4365b8c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+
+import java.io.IOException;
+
+/**
+ * SCMHAManager provides HA service for SCM.
+ *
+ * It uses Apache Ratis for HA implementation. We will have a 2N+1
+ * node Ratis ring. The Ratis ring will have one Leader node and 2N follower
+ * nodes.
+ *
+ * TODO
+ *
+ */
+public class SCMHAManager {
+
+ private static boolean isLeader = true;
+
+ private final SCMRatisServer ratisServer;
+
+ /**
+ * Creates SCMHAManager instance.
+ */
+ public SCMHAManager(final ConfigurationSource conf) throws IOException {
+ this.ratisServer = new SCMRatisServer(
+ conf.getObject(SCMHAConfiguration.class), conf);
+ }
+
+ /**
+ * Starts HA service.
+ */
+ public void start() throws IOException {
+ ratisServer.start();
+ }
+
+ /**
+ * Returns true if the current SCM is the leader.
+ */
+ public static boolean isLeader() {
+ return isLeader;
+ }
+
+ /**
+ * Returns RatisServer instance associated with the SCM instance.
+ */
+ public SCMRatisServer getRatisServer() {
+ return ratisServer;
+ }
+
+ /**
+ * Stops the HA service.
+ */
+ public void shutdown() throws IOException {
+ ratisServer.stop();
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java
new file mode 100644
index 000000000000..d65c23502b58
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.primitives.Ints;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ProtocolMessageEnum;
+
+import org.apache.ratis.protocol.Message;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.Method;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.MethodArgument;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.SCMRatisRequestProto;
+
+
+/**
+ * Represents the request that is sent to RatisServer.
+ */
+public final class SCMRatisRequest {
+
+ private final RequestType type;
+ private final String operation;
+ private final Object[] arguments;
+
+ private SCMRatisRequest(final RequestType type, final String operation,
+ final Object... arguments) {
+ this.type = type;
+ this.operation = operation;
+ this.arguments = arguments;
+ }
+
+ public static SCMRatisRequest of(final RequestType type,
+ final String operation,
+ final Object... arguments) {
+ return new SCMRatisRequest(type, operation, arguments);
+ }
+
+ /**
+ * Returns the type of request.
+ */
+ public RequestType getType() {
+ return type;
+ }
+
+ /**
+ * Returns the operation that this request represents.
+ */
+ public String getOperation() {
+ return operation;
+ }
+
+ /**
+ * Returns the arguments encoded in the request.
+ */
+ public Object[] getArguments() {
+ return arguments.clone();
+ }
+
+ /**
+ * Encodes the request into Ratis Message.
+ */
+ public Message encode() throws InvalidProtocolBufferException {
+ final SCMRatisRequestProto.Builder requestProtoBuilder =
+ SCMRatisRequestProto.newBuilder();
+ requestProtoBuilder.setType(type);
+
+ final Method.Builder methodBuilder = Method.newBuilder();
+ methodBuilder.setName(operation);
+
+ final List args = new ArrayList<>();
+ for (Object argument : arguments) {
+ final MethodArgument.Builder argBuilder = MethodArgument.newBuilder();
+ argBuilder.setType(argument.getClass().getCanonicalName());
+ if (argument instanceof GeneratedMessage) {
+ argBuilder.setValue(((GeneratedMessage) argument).toByteString());
+ } else if (argument instanceof ProtocolMessageEnum) {
+ argBuilder.setValue(ByteString.copyFrom(Ints.toByteArray(
+ ((ProtocolMessageEnum) argument).getNumber())));
+ } else {
+ throw new InvalidProtocolBufferException(argument.getClass() +
+ " is not a protobuf object!");
+ }
+ args.add(argBuilder.build());
+ }
+ methodBuilder.addAllArgs(args);
+ return Message.valueOf(
+ org.apache.ratis.thirdparty.com.google.protobuf.ByteString.copyFrom(
+ requestProtoBuilder.build().toByteArray()));
+ }
+
+ /**
+ * Decodes the request from Ratis Message.
+ */
+ public static SCMRatisRequest decode(Message message)
+ throws InvalidProtocolBufferException {
+ final SCMRatisRequestProto requestProto =
+ SCMRatisRequestProto.parseFrom(message.getContent().toByteArray());
+ final Method method = requestProto.getMethod();
+ List