diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index 0404530b2f1f..36b9a308e8fe 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -47,6 +47,10 @@ */ public class ContainerManagerImpl implements ContainerManagerV2 { + /* + * TODO: Introduce container level locks. + */ + /** * */ @@ -72,17 +76,18 @@ public class ContainerManagerImpl implements ContainerManagerV2 { * */ public ContainerManagerImpl( - // Introduce builder for this class? - final Configuration conf, final PipelineManager pipelineManager, - final SCMHAManager scmhaManager, + final Configuration conf, + final SCMHAManager scmHaManager, + final PipelineManager pipelineManager, final Table containerStore) throws IOException { + // Introduce builder for this class? this.lock = new ReentrantReadWriteLock(); this.pipelineManager = pipelineManager; - this.containerStateManager = ContainerStateManagerImpl.newBuilder() + this.containerStateManager = ContainerStateManagerImpl.newBuilder() .setConfiguration(conf) .setPipelineManager(pipelineManager) - .setRatisServer(scmhaManager.getRatisServer()) + .setRatisServer(scmHaManager.getRatisServer()) .setContainerStore(containerStore) .build(); } @@ -275,8 +280,8 @@ public void notifyContainerReportProcessing(final boolean isFullReport, } @Override - public void close() throws IOException { - throw new UnsupportedOperationException("Not yet implemented!"); + public void close() throws Exception { + containerStateManager.close(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java index 37c7b709d458..863ca4da66b9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java @@ -16,7 +16,6 @@ */ package org.apache.hadoop.hdds.scm.container; -import java.io.Closeable; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -36,8 +35,8 @@ * mapping. This is used by SCM when allocating new locations and when * looking up a key. */ -public interface ContainerManagerV2 extends Closeable { - +public interface ContainerManagerV2 extends AutoCloseable { + // TODO: Rename this to ContainerManager /** * Returns all the container Ids managed by ContainerManager. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java index 16fe3407bde4..4f4456ace47d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.container; import java.io.IOException; - import java.lang.reflect.Proxy; import java.util.HashSet; import java.util.Map; @@ -27,128 +26,92 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; -import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; -import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler; -import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; -import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; -import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.Table.KeyValue; -import org.apache.hadoop.hdds.utils.db.TableIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.states.ContainerState; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; +import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler; +import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.Table.KeyValue; +import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.ozone.common.statemachine.StateMachine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** - * TODO: Add javadoc. + * Default implementation of ContainerStateManager. This implementation + * holds the Container States in-memory which is backed by a persistent store. + * The persistent store is always kept in sync with the in-memory state changes. */ public final class ContainerStateManagerImpl implements ContainerStateManagerV2 { - /* ********************************************************************** - * Container Life Cycle * - * * - * Event and State Transition Mapping: * - * * - * State: OPEN ----------------> CLOSING * - * Event: FINALIZE * - * * - * State: CLOSING ----------------> QUASI_CLOSED * - * Event: QUASI_CLOSE * - * * - * State: CLOSING ----------------> CLOSED * - * Event: CLOSE * - * * - * State: QUASI_CLOSED ----------------> CLOSED * - * Event: FORCE_CLOSE * - * * - * State: CLOSED ----------------> DELETING * - * Event: DELETE * - * * - * State: DELETING ----------------> DELETED * - * Event: CLEANUP * - * * - * * - * Container State Flow: * - * * - * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] * - * (FINALIZE) | (QUASI_CLOSE) | * - * | | * - * | | * - * (CLOSE) | (FORCE_CLOSE) | * - * | | * - * | | * - * +--------->[CLOSED]<--------+ * - * | * - * (DELETE)| * - * | * - * | * - * [DELETING] * - * | * - * (CLEANUP) | * - * | * - * V * - * [DELETED] * - * * - ************************************************************************/ - /** - * + * Logger instance of ContainerStateManagerImpl. */ private static final Logger LOG = LoggerFactory.getLogger( ContainerStateManagerImpl.class); /** - * + * Configured container size. */ private final long containerSize; /** - * + * The container ID sequence which is used to create new container. + * This will be removed once we have a Distributed Sequence ID Generator. */ + @Deprecated private final AtomicLong nextContainerID; /** - * + * In-memory representation of Container States. */ private final ContainerStateMap containers; /** - * + * Persistent store for Container States. */ - private final PipelineManager pipelineManager; + private Table containerStore; /** - * + * PipelineManager instance. */ - private Table containerStore; + private final PipelineManager pipelineManager; /** - * + * Container lifecycle state machine. */ private final StateMachine stateMachine; /** - * + * We use the containers in round-robin fashion for operations like block + * allocation. This map is used for remembering the last used container. */ private final ConcurrentHashMap lastUsedMap; /** + * constructs ContainerStateManagerImpl instance and loads the containers + * form the persistent storage. * + * @param conf the Configuration + * @param pipelineManager the {@link PipelineManager} instance + * @param containerStore the persistent storage + * @throws IOException in case of error while loading the containers */ private ContainerStateManagerImpl(final Configuration conf, final PipelineManager pipelineManager, @@ -158,7 +121,7 @@ private ContainerStateManagerImpl(final Configuration conf, this.containerStore = containerStore; this.stateMachine = newStateMachine(); this.containerSize = getConfiguredContainerSize(conf); - this.nextContainerID = new AtomicLong(); + this.nextContainerID = new AtomicLong(1L); this.containers = new ContainerStateMap(); this.lastUsedMap = new ConcurrentHashMap<>(); @@ -166,7 +129,9 @@ private ContainerStateManagerImpl(final Configuration conf, } /** + * Creates and initializes a new Container Lifecycle StateMachine. * + * @return the Container Lifecycle StateMachine */ private StateMachine newStateMachine() { @@ -208,7 +173,9 @@ private StateMachine newStateMachine() { } /** + * Returns the configured container size. * + * @return the max size of container */ private long getConfiguredContainerSize(final Configuration conf) { return (long) conf.getStorageSize( @@ -218,7 +185,9 @@ private long getConfiguredContainerSize(final Configuration conf) { } /** + * Loads the containers from container store into memory. * + * @throws IOException in case of error while loading the containers */ private void initialize() throws IOException { TableIterator> @@ -282,16 +251,20 @@ public void addContainer(final ContainerInfoProto containerInfo) Preconditions.checkNotNull(containerInfo); final ContainerInfo container = ContainerInfo.fromProtobuf(containerInfo); - if (getContainer(container.containerID()) == null) { - Preconditions.checkArgument(nextContainerID.get() - == container.containerID().getId(), - "ContainerID mismatch."); - - pipelineManager.addContainerToPipeline( - container.getPipelineID(), container.containerID()); - containers.addContainer(container); - nextContainerID.incrementAndGet(); - } + final ContainerID containerID = container.containerID(); + final PipelineID pipelineID = container.getPipelineID(); + + /* + * TODO: + * Check if the container already exist in in ContainerStateManager. + * This optimization can be done after moving ContainerNotFoundException + * from ContainerStateMap to ContainerManagerImpl. + */ + + containerStore.put(containerID, container); + containers.addContainer(container); + pipelineManager.addContainerToPipeline(pipelineID, containerID); + nextContainerID.incrementAndGet(); } void updateContainerState(final ContainerID containerID, @@ -337,7 +310,9 @@ void removeContainer(final ContainerID containerID) throw new UnsupportedOperationException("Not yet implemented!"); } - void close() throws IOException { + @Override + public void close() throws Exception { + containerStore.close(); } public static Builder newBuilder() { @@ -382,7 +357,6 @@ public ContainerStateManagerV2 build() throws IOException { final ContainerStateManagerV2 csm = new ContainerStateManagerImpl( conf, pipelineMgr, table); - scmRatisServer.registerStateMachineHandler(RequestType.CONTAINER, csm); final SCMHAInvocationHandler invocationHandler = new SCMHAInvocationHandler(RequestType.CONTAINER, csm, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java index 9960354be402..3520b0146e23 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java @@ -17,20 +17,26 @@ package org.apache.hadoop.hdds.scm.container; +import java.io.IOException; +import java.util.Set; + import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.scm.metadata.Replicate; -import java.io.IOException; -import java.util.Set; - /** + * A ContainerStateManager is responsible for keeping track of all the + * container and its state inside SCM, it also exposes methods to read and + * modify the container and its state. * - * TODO: Add proper javadoc. + * All the mutation operations are marked with {@link Replicate} annotation so + * that when SCM-HA is enabled, the mutations are replicated from leader SCM + * to the followers. * - * Implementation of methods marked with {@code @Replicate} annotation should be + * When a method is marked with {@link Replicate} annotation it should follow + * the below rules. * - * 1. Idempotent + * 1. The method call should be Idempotent * 2. Arguments should be of protobuf objects * 3. Return type should be of protobuf object * 4. The declaration should throw RaftException @@ -38,13 +44,65 @@ */ public interface ContainerStateManagerV2 { + //TODO: Rename this to ContainerStateManager + + /* ********************************************************************** + * Container Life Cycle * + * * + * Event and State Transition Mapping: * + * * + * State: OPEN ----------------> CLOSING * + * Event: FINALIZE * + * * + * State: CLOSING ----------------> QUASI_CLOSED * + * Event: QUASI_CLOSE * + * * + * State: CLOSING ----------------> CLOSED * + * Event: CLOSE * + * * + * State: QUASI_CLOSED ----------------> CLOSED * + * Event: FORCE_CLOSE * + * * + * State: CLOSED ----------------> DELETING * + * Event: DELETE * + * * + * State: DELETING ----------------> DELETED * + * Event: CLEANUP * + * * + * * + * Container State Flow: * + * * + * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] * + * (FINALIZE) | (QUASI_CLOSE) | * + * | | * + * | | * + * (CLOSE) | (FORCE_CLOSE) | * + * | | * + * | | * + * +--------->[CLOSED]<--------+ * + * | * + * (DELETE)| * + * | * + * | * + * [DELETING] * + * | * + * (CLEANUP) | * + * | * + * V * + * [DELETED] * + * * + ************************************************************************/ + /** - * + * Returns a new container ID which can be used for allocating a new + * container. */ ContainerID getNextContainerID(); /** + * Returns the ID of all the managed containers. * + * @return Set of {@link ContainerID} */ Set getContainerIDs(); @@ -72,4 +130,8 @@ Set getContainerReplicas(ContainerID containerID) void addContainer(ContainerInfoProto containerInfo) throws IOException; + /** + * + */ + void close() throws Exception; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index 8cef966995eb..d71049b7052e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -535,6 +535,7 @@ private void flushCache(final ContainerInfo... containerInfos) { } } + // TODO: Move container not found exception to upper layer. private void checkIfContainerExist(ContainerID containerID) throws ContainerNotFoundException { if (!containerMap.containsKey(containerID)) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java index c78c6161ac9c..cbe2ce38ef41 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java @@ -50,6 +50,7 @@ public SCMHAInvocationHandler(final RequestType requestType, this.requestType = requestType; this.localHandler = localHandler; this.ratisHandler = ratisHandler; + ratisHandler.registerStateMachineHandler(requestType, localHandler); } @Override @@ -71,8 +72,9 @@ public Object invoke(final Object proxy, final Method method, */ private Object invokeLocal(Method method, Object[] args) throws InvocationTargetException, IllegalAccessException { - LOG.trace("Invoking method {} on target {}", method, localHandler); - return method.invoke(method, args); + LOG.trace("Invoking method {} on target {} with arguments {}", + method, localHandler, args); + return method.invoke(localHandler, args); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java index b38fc4365b8c..eb6c8006c5e3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java @@ -17,60 +17,30 @@ package org.apache.hadoop.hdds.scm.ha; -import org.apache.hadoop.hdds.conf.ConfigurationSource; - import java.io.IOException; /** * SCMHAManager provides HA service for SCM. - * - * It uses Apache Ratis for HA implementation. We will have a 2N+1 - * node Ratis ring. The Ratis ring will have one Leader node and 2N follower - * nodes. - * - * TODO - * */ -public class SCMHAManager { - - private static boolean isLeader = true; - - private final SCMRatisServer ratisServer; - - /** - * Creates SCMHAManager instance. - */ - public SCMHAManager(final ConfigurationSource conf) throws IOException { - this.ratisServer = new SCMRatisServer( - conf.getObject(SCMHAConfiguration.class), conf); - } +public interface SCMHAManager { /** * Starts HA service. */ - public void start() throws IOException { - ratisServer.start(); - } + void start() throws IOException; /** * Returns true if the current SCM is the leader. */ - public static boolean isLeader() { - return isLeader; - } + boolean isLeader(); /** * Returns RatisServer instance associated with the SCM instance. */ - public SCMRatisServer getRatisServer() { - return ratisServer; - } + SCMRatisServer getRatisServer(); /** * Stops the HA service. */ - public void shutdown() throws IOException { - ratisServer.stop(); - } - + void shutdown() throws IOException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java new file mode 100644 index 000000000000..89ac714993c7 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ *

http://www.apache.org/licenses/LICENSE-2.0 + *

+ *

Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.hadoop.hdds.conf.ConfigurationSource; + +import java.io.IOException; + +/** + * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1 + * node Ratis ring. The Ratis ring will have one Leader node and 2N follower + * nodes. + * + * TODO + * + */ +public class SCMHAManagerImpl implements SCMHAManager { + + private static boolean isLeader = true; + + private final SCMRatisServerImpl ratisServer; + + /** + * Creates SCMHAManager instance. + */ + public SCMHAManagerImpl(final ConfigurationSource conf) throws IOException { + this.ratisServer = new SCMRatisServerImpl( + conf.getObject(SCMHAConfiguration.class), conf); + } + + /** + * {@inheritDoc} + */ + @Override + public void start() throws IOException { + ratisServer.start(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isLeader() { + return isLeader; + } + + /** + * {@inheritDoc} + */ + @Override + public SCMRatisServer getRatisServer() { + return ratisServer; + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() throws IOException { + ratisServer.stop(); + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java index c4bedcc0e4c2..21ca4be50841 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java @@ -67,6 +67,10 @@ public Exception getException() { public static Message encode(final Object result) throws InvalidProtocolBufferException { + if (result == null) { + return Message.EMPTY; + } + final ByteString value; if (result instanceof GeneratedMessage) { value = ((GeneratedMessage) result).toByteString(); @@ -98,6 +102,10 @@ public static SCMRatisResponse decode(RaftClientReply reply) private static Object deserializeResult(byte[] response) throws InvalidProtocolBufferException { + if (response.length == 0) { + return null; + } + final SCMRatisResponseProto responseProto = SCMRatisResponseProto.parseFrom(response); try { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java index 209535d14a8f..4ddbc7b63ac9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java @@ -17,93 +17,22 @@ package org.apache.hadoop.hdds.scm.ha; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; + import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftGroupId; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.RaftServer; /** * TODO. */ -public class SCMRatisServer { - - private final InetSocketAddress address; - private final RaftServer server; - private final RaftGroupId raftGroupId; - private final RaftGroup raftGroup; - private final RaftPeerId raftPeerId; - private final SCMStateMachine scmStateMachine; - private final ClientId clientId = ClientId.randomId(); - private final AtomicLong callId = new AtomicLong(); +public interface SCMRatisServer { + void start() throws IOException; - // TODO: Refactor and remove ConfigurationSource and use only - // SCMHAConfiguration. - SCMRatisServer(final SCMHAConfiguration haConf, - final ConfigurationSource conf) - throws IOException { - final String scmServiceId = "SCM-HA-Service"; - final String scmNodeId = "localhost"; - this.raftPeerId = RaftPeerId.getRaftPeerId(scmNodeId); - this.address = haConf.getRatisBindAddress(); - final RaftPeer localRaftPeer = new RaftPeer(raftPeerId, address); - final List raftPeers = new ArrayList<>(); - raftPeers.add(localRaftPeer); - final RaftProperties serverProperties = RatisUtil - .newRaftProperties(haConf, conf); - this.raftGroupId = RaftGroupId.valueOf( - UUID.nameUUIDFromBytes(scmServiceId.getBytes(StandardCharsets.UTF_8))); - this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers); - this.scmStateMachine = new SCMStateMachine(); - this.server = RaftServer.newBuilder() - .setServerId(raftPeerId) - .setGroup(raftGroup) - .setProperties(serverProperties) - .setStateMachine(scmStateMachine) - .build(); - } - - void start() throws IOException { - server.start(); - } - - public void registerStateMachineHandler(final RequestType handlerType, - final Object handler) { - scmStateMachine.registerHandler(handlerType, handler); - } + void registerStateMachineHandler(RequestType handlerType, Object handler); SCMRatisResponse submitRequest(SCMRatisRequest request) - throws IOException, ExecutionException, InterruptedException { - final RaftClientRequest raftClientRequest = new RaftClientRequest( - clientId, server.getId(), raftGroupId, nextCallId(), request.encode(), - RaftClientRequest.writeRequestType(), null); - final RaftClientReply raftClientReply = - server.submitClientRequestAsync(raftClientRequest).get(); - return SCMRatisResponse.decode(raftClientReply); - } - - private long nextCallId() { - return callId.getAndIncrement() & Long.MAX_VALUE; - } - - void stop() throws IOException { - server.close(); - } + throws IOException, ExecutionException, InterruptedException; + void stop() throws IOException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java new file mode 100644 index 000000000000..45ae212ebb66 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; + +/** + * TODO. + */ +public class SCMRatisServerImpl implements SCMRatisServer { + + private final InetSocketAddress address; + private final RaftServer server; + private final RaftGroupId raftGroupId; + private final RaftGroup raftGroup; + private final RaftPeerId raftPeerId; + private final SCMStateMachine scmStateMachine; + private final ClientId clientId = ClientId.randomId(); + private final AtomicLong callId = new AtomicLong(); + + + // TODO: Refactor and remove ConfigurationSource and use only + // SCMHAConfiguration. + SCMRatisServerImpl(final SCMHAConfiguration haConf, + final ConfigurationSource conf) + throws IOException { + final String scmServiceId = "SCM-HA-Service"; + final String scmNodeId = "localhost"; + this.raftPeerId = RaftPeerId.getRaftPeerId(scmNodeId); + this.address = haConf.getRatisBindAddress(); + final RaftPeer localRaftPeer = new RaftPeer(raftPeerId, address); + final List raftPeers = new ArrayList<>(); + raftPeers.add(localRaftPeer); + final RaftProperties serverProperties = RatisUtil + .newRaftProperties(haConf, conf); + this.raftGroupId = RaftGroupId.valueOf( + UUID.nameUUIDFromBytes(scmServiceId.getBytes(StandardCharsets.UTF_8))); + this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers); + this.scmStateMachine = new SCMStateMachine(); + this.server = RaftServer.newBuilder() + .setServerId(raftPeerId) + .setGroup(raftGroup) + .setProperties(serverProperties) + .setStateMachine(scmStateMachine) + .build(); + } + + @Override + public void start() throws IOException { + server.start(); + } + + @Override + public void registerStateMachineHandler(final RequestType handlerType, + final Object handler) { + scmStateMachine.registerHandler(handlerType, handler); + } + + @Override + public SCMRatisResponse submitRequest(SCMRatisRequest request) + throws IOException, ExecutionException, InterruptedException { + final RaftClientRequest raftClientRequest = new RaftClientRequest( + clientId, server.getId(), raftGroupId, nextCallId(), request.encode(), + RaftClientRequest.writeRequestType(), null); + final RaftClientReply raftClientReply = + server.submitClientRequestAsync(raftClientRequest).get(); + return SCMRatisResponse.decode(raftClientReply); + } + + private long nextCallId() { + return callId.getAndIncrement() & Long.MAX_VALUE; + } + + @Override + public void stop() throws IOException { + server.close(); + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java new file mode 100644 index 000000000000..022d3921df0b --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import java.io.File; +import java.util.UUID; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager; +import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition; +import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + + +/** + * Tests to verify the functionality of ContainerManager. + */ +public class TestContainerManagerImpl { + + private File testDir; + private DBStore dbStore; + private ContainerManagerV2 containerManager; + + @Before + public void setUp() throws Exception { + final OzoneConfiguration conf = SCMTestUtils.getConf(); + testDir = GenericTestUtils.getTestDir( + TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID()); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath()); + dbStore = DBStoreBuilder.createDBStore( + conf, new SCMDBDefinition()); + final PipelineManager pipelineManager = MockPipelineManager.getInstance(); + pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + containerManager = new ContainerManagerImpl(conf, + MockSCMHAManager.getInstance(), pipelineManager, + SCMDBDefinition.CONTAINERS.getTable(dbStore)); + } + + @After + public void cleanup() throws Exception { + if(containerManager != null) { + containerManager.close(); + } + + if (dbStore != null) { + dbStore.close(); + } + + FileUtil.fullyDelete(testDir); + } + + @Test + public void testAllocateContainer() throws Exception { + Assert.assertTrue(containerManager.getContainerIDs().isEmpty()); + final ContainerInfo container = containerManager.allocateContainer( + HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, "admin"); + Assert.assertEquals(1, containerManager.getContainerIDs().size()); + Assert.assertNotNull(containerManager.getContainer( + container.containerID())); + } + +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java new file mode 100644 index 000000000000..c3b14fb405bb --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.StateMachineException; + +/** + * Mock SCMHAManager implementation for testing. + */ +public final class MockSCMHAManager implements SCMHAManager { + + private final SCMRatisServer ratisServer; + + public static SCMHAManager getInstance() { + return new MockSCMHAManager(); + } + + /** + * Creates MockSCMHAManager instance. + */ + private MockSCMHAManager() { + this.ratisServer = new MockRatisServer(); + } + + @Override + public void start() throws IOException { + ratisServer.start(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isLeader() { + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public SCMRatisServer getRatisServer() { + return ratisServer; + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() throws IOException { + ratisServer.stop(); + } + + private static class MockRatisServer implements SCMRatisServer { + + private Map handlers = + new EnumMap<>(RequestType.class); + + @Override + public void start() { + } + + @Override + public void registerStateMachineHandler(final RequestType handlerType, + final Object handler) { + handlers.put(handlerType, handler); + } + + @Override + public SCMRatisResponse submitRequest(final SCMRatisRequest request) + throws IOException { + final RaftGroupMemberId raftId = RaftGroupMemberId.valueOf( + RaftPeerId.valueOf("peer"), RaftGroupId.randomId()); + RaftClientReply reply; + try { + final Message result = process(request); + return SCMRatisResponse.decode(new RaftClientReply(ClientId.randomId(), + raftId, 1L, true, result, null, 1L, null)); + } catch (Exception ex) { + return SCMRatisResponse.decode(new RaftClientReply(ClientId.randomId(), + raftId, 1L, false, null, + new StateMachineException(raftId, ex), 1L, null)); + } + } + + private Message process(final SCMRatisRequest request) + throws Exception { + try { + final Object handler = handlers.get(request.getType()); + + if (handler == null) { + throw new IOException("No handler found for request type " + + request.getType()); + } + + final List> argumentTypes = new ArrayList<>(); + for(Object args : request.getArguments()) { + argumentTypes.add(args.getClass()); + } + final Object result = handler.getClass().getMethod( + request.getOperation(), argumentTypes.toArray(new Class[0])) + .invoke(handler, request.getArguments()); + + return SCMRatisResponse.encode(result); + } catch (NoSuchMethodException | SecurityException ex) { + throw new InvalidProtocolBufferException(ex.getMessage()); + } catch (InvocationTargetException e) { + final Exception targetEx = (Exception) e.getTargetException(); + throw targetEx != null ? targetEx : e; + } + } + + @Override + public void stop() { + } + } + +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java new file mode 100644 index 000000000000..5dd60824838b --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Mock PipelineManager implementation for testing. + */ +public final class MockPipelineManager implements PipelineManager { + + private PipelineStateManager stateManager; + + public static PipelineManager getInstance() { + return new MockPipelineManager(); + } + + private MockPipelineManager() { + this.stateManager = new PipelineStateManager(); + } + + @Override + public Pipeline createPipeline(final ReplicationType type, + final ReplicationFactor factor) + throws IOException { + final List nodes = Stream.generate( + MockDatanodeDetails::randomDatanodeDetails) + .limit(factor.getNumber()).collect(Collectors.toList()); + final Pipeline pipeline = Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setType(type) + .setFactor(factor) + .setNodes(nodes) + .setState(Pipeline.PipelineState.OPEN) + .build(); + stateManager.addPipeline(pipeline); + return pipeline; + } + + @Override + public Pipeline createPipeline(final ReplicationType type, + final ReplicationFactor factor, + final List nodes) { + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setType(type) + .setFactor(factor) + .setNodes(nodes) + .setState(Pipeline.PipelineState.OPEN) + .build(); + } + + @Override + public Pipeline getPipeline(final PipelineID pipelineID) + throws PipelineNotFoundException { + return stateManager.getPipeline(pipelineID); + } + + @Override + public boolean containsPipeline(final PipelineID pipelineID) { + try { + return stateManager.getPipeline(pipelineID) != null; + } catch (PipelineNotFoundException e) { + return false; + } + } + + @Override + public List getPipelines() { + return stateManager.getPipelines(); + } + + @Override + public List getPipelines(final ReplicationType type) { + return stateManager.getPipelines(type); + } + + @Override + public List getPipelines(final ReplicationType type, + final ReplicationFactor factor) { + return stateManager.getPipelines(type, factor); + } + + @Override + public List getPipelines(final ReplicationType type, + final Pipeline.PipelineState state) { + return stateManager.getPipelines(type, state); + } + + @Override + public List getPipelines(final ReplicationType type, + final ReplicationFactor factor, + final Pipeline.PipelineState state) { + return stateManager.getPipelines(type, factor, state); + } + + @Override + public List getPipelines(final ReplicationType type, + final ReplicationFactor factor, final Pipeline.PipelineState state, + final Collection excludeDns, + final Collection excludePipelines) { + return stateManager.getPipelines(type, factor, state, + excludeDns, excludePipelines); + } + + @Override + public void addContainerToPipeline(final PipelineID pipelineID, + final ContainerID containerID) + throws IOException { + stateManager.addContainerToPipeline(pipelineID, containerID); + } + + @Override + public void removeContainerFromPipeline(final PipelineID pipelineID, + final ContainerID containerID) + throws IOException { + stateManager.removeContainerFromPipeline(pipelineID, containerID); + } + + @Override + public NavigableSet getContainersInPipeline( + final PipelineID pipelineID) throws IOException { + return getContainersInPipeline(pipelineID); + } + + @Override + public int getNumberOfContainers(final PipelineID pipelineID) + throws IOException { + return getContainersInPipeline(pipelineID).size(); + } + + @Override + public void openPipeline(final PipelineID pipelineId) + throws IOException { + stateManager.openPipeline(pipelineId); + } + + @Override + public void finalizeAndDestroyPipeline(final Pipeline pipeline, + final boolean onTimeout) + throws IOException { + stateManager.finalizePipeline(pipeline.getId()); + } + + @Override + public void scrubPipeline(final ReplicationType type, + final ReplicationFactor factor) + throws IOException { + + } + + @Override + public void startPipelineCreator() { + + } + + @Override + public void triggerPipelineCreation() { + + } + + @Override + public void incNumBlocksAllocatedMetric(final PipelineID id) { + + } + + @Override + public void activatePipeline(final PipelineID pipelineID) + throws IOException { + + } + + @Override + public void deactivatePipeline(final PipelineID pipelineID) + throws IOException { + stateManager.deactivatePipeline(pipelineID); + } + + @Override + public boolean getSafeModeStatus() { + return false; + } + + @Override + public void close() throws IOException { + + } + + @Override + public Map getPipelineInfo() { + return null; + } + + @Override + public void onMessage(final SCMSafeModeManager.SafeModeStatus safeModeStatus, + final EventPublisher publisher) { + + } +} \ No newline at end of file