diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index 81445ea1220c..74e70bbe058a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -192,5 +192,5 @@ void deleteContainer(ContainerID containerID)
* Returns containerStateManger.
* @return containerStateManger
*/
- ContainerStateManagerV2 getContainerStateManager();
+ ContainerStateManager getContainerStateManager();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index d316cd8765c9..5e220dff59bb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -83,7 +83,7 @@ public class ContainerManagerImpl implements ContainerManager {
/**
*
*/
- private final ContainerStateManagerV2 containerStateManager;
+ private final ContainerStateManager containerStateManager;
private final SCMHAManager haManager;
private final SequenceIdGenerator sequenceIdGen;
@@ -443,7 +443,7 @@ public void close() throws IOException {
// Remove this after fixing Recon
@VisibleForTesting
- public ContainerStateManagerV2 getContainerStateManager() {
+ public ContainerStateManager getContainerStateManager() {
return containerStateManager;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index e5ddba9d2a93..dbd10a3fc81c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,530 +18,170 @@
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.states.ContainerState;
-import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AtomicLongMap;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * A container state manager keeps track of container states and returns
- * containers that match various queries.
- *
- * This state machine is driven by a combination of server and client actions.
- *
- * This is how a create container happens: 1. When a container is created, the
- * Server(or SCM) marks that Container as ALLOCATED state. In this state, SCM
- * has chosen a pipeline for container to live on. However, the container is not
- * created yet. This container along with the pipeline is returned to the
- * client.
- *
- * 2. The client when it sees the Container state as ALLOCATED understands that
- * container needs to be created on the specified pipeline. The client lets the
- * SCM know that saw this flag and is initiating the on the data nodes.
- *
- * This is done by calling into notifyObjectCreation(ContainerName,
- * BEGIN_CREATE) flag. When SCM gets this call, SCM puts the container state
- * into CREATING. All this state means is that SCM told Client to create a
- * container and client saw that request.
- *
- * 3. Then client makes calls to datanodes directly, asking the datanodes to
- * create the container. This is done with the help of pipeline that supports
- * this container.
- *
- * 4. Once the creation of the container is complete, the client will make
- * another call to the SCM, this time specifying the containerName and the
- * COMPLETE_CREATE as the Event.
- *
- * 5. With COMPLETE_CREATE event, the container moves to an Open State. This is
- * the state when clients can write to a container.
- *
- * 6. If the client does not respond with the COMPLETE_CREATE event with a
- * certain time, the state machine times out and triggers a delete operation of
- * the container.
- *
- * Please see the function initializeStateMachine below to see how this looks in
- * code.
- *
- * Reusing existing container :
- *
- * The create container call is not made all the time, the system tries to use
- * open containers as much as possible. So in those cases, it looks thru the
- * list of open containers and will return containers that match the specific
- * signature.
- *
- * Please note : Logically there are 3 separate state machines in the case of
- * containers.
- *
- * The Create State Machine -- Commented extensively above.
- *
- * Open/Close State Machine - Once the container is in the Open State,
- * eventually it will be closed, once sufficient data has been written to it.
- *
- * TimeOut Delete Container State Machine - if the container creating times out,
- * then Container State manager decides to delete the container.
+ * A ContainerStateManager is responsible for keeping track of all the
+ * container and its state inside SCM, it also exposes methods to read and
+ * modify the container and its state.
+ *
+ * All the mutation operations are marked with {@link Replicate} annotation so
+ * that when SCM-HA is enabled, the mutations are replicated from leader SCM
+ * to the followers.
+ *
+ * When a method is marked with {@link Replicate} annotation it should follow
+ * the below rules.
+ *
+ * 1. The method call should be Idempotent
+ * 2. Arguments should be of protobuf objects
+ * 3. Return type should be of protobuf object
+ * 4. The declaration should throw RaftException
+ *
*/
-public class ContainerStateManager {
- private static final Logger LOG =
- LoggerFactory.getLogger(ContainerStateManager.class);
-
- private final StateMachine stateMachine;
-
- private final long containerSize;
- private final boolean autoCreateRatisOne;
- private final ConcurrentHashMap lastUsedMap;
- private final ContainerStateMap containers;
- private final AtomicLong containerCount;
- private final AtomicLongMap containerStateCount =
- AtomicLongMap.create();
+public interface ContainerStateManager {
+
+ /* **********************************************************************
+ * Container Life Cycle *
+ * *
+ * Event and State Transition Mapping: *
+ * *
+ * State: OPEN ----------------> CLOSING *
+ * Event: FINALIZE *
+ * *
+ * State: CLOSING ----------------> QUASI_CLOSED *
+ * Event: QUASI_CLOSE *
+ * *
+ * State: CLOSING ----------------> CLOSED *
+ * Event: CLOSE *
+ * *
+ * State: QUASI_CLOSED ----------------> CLOSED *
+ * Event: FORCE_CLOSE *
+ * *
+ * State: CLOSED ----------------> DELETING *
+ * Event: DELETE *
+ * *
+ * State: DELETING ----------------> DELETED *
+ * Event: CLEANUP *
+ * *
+ * *
+ * Container State Flow: *
+ * *
+ * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] *
+ * (FINALIZE) | (QUASI_CLOSE) | *
+ * | | *
+ * | | *
+ * (CLOSE) | (FORCE_CLOSE) | *
+ * | | *
+ * | | *
+ * +--------->[CLOSED]<--------+ *
+ * | *
+ * (DELETE)| *
+ * | *
+ * | *
+ * [DELETING] *
+ * | *
+ * (CLEANUP) | *
+ * | *
+ * V *
+ * [DELETED] *
+ * *
+ ************************************************************************/
/**
- * Constructs a Container State Manager that tracks all containers owned by
- * SCM for the purpose of allocation of blocks.
- *
- * TODO : Add Container Tags so we know which containers are owned by SCM.
- */
- @SuppressWarnings("unchecked")
- public ContainerStateManager(final ConfigurationSource configuration) {
-
- // Initialize the container state machine.
- final Set finalStates = new HashSet();
-
- // These are the steady states of a container.
- finalStates.add(LifeCycleState.OPEN);
- finalStates.add(LifeCycleState.CLOSED);
- finalStates.add(LifeCycleState.DELETED);
-
- this.stateMachine = new StateMachine<>(LifeCycleState.OPEN,
- finalStates);
- initializeStateMachine();
-
- this.containerSize = (long) configuration.getStorageSize(
- ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
- ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
- StorageUnit.BYTES);
- this.autoCreateRatisOne = configuration.getBoolean(
- ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
- ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE_DEFAULT);
-
- this.lastUsedMap = new ConcurrentHashMap<>();
- this.containerCount = new AtomicLong(0);
- this.containers = new ContainerStateMap();
- }
-
- /*
- *
- * Event and State Transition Mapping:
- *
- * State: OPEN ----------------> CLOSING
- * Event: FINALIZE
- *
- * State: CLOSING ----------------> QUASI_CLOSED
- * Event: QUASI_CLOSE
- *
- * State: CLOSING ----------------> CLOSED
- * Event: CLOSE
- *
- * State: QUASI_CLOSED ----------------> CLOSED
- * Event: FORCE_CLOSE
- *
- * State: CLOSED ----------------> DELETING
- * Event: DELETE
- *
- * State: DELETING ----------------> DELETED
- * Event: CLEANUP
- *
- *
- * Container State Flow:
- *
- * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED]
- * (FINALIZE) | (QUASI_CLOSE) |
- * | |
- * | |
- * (CLOSE) | (FORCE_CLOSE) |
- * | |
- * | |
- * +--------->[CLOSED]<--------+
- * |
- * (DELETE)|
- * |
- * |
- * [DELETING]
- * |
- * (CLEANUP) |
- * |
- * V
- * [DELETED]
*
*/
- private void initializeStateMachine() {
- stateMachine.addTransition(LifeCycleState.OPEN,
- LifeCycleState.CLOSING,
- LifeCycleEvent.FINALIZE);
-
- stateMachine.addTransition(LifeCycleState.CLOSING,
- LifeCycleState.QUASI_CLOSED,
- LifeCycleEvent.QUASI_CLOSE);
-
- stateMachine.addTransition(LifeCycleState.CLOSING,
- LifeCycleState.CLOSED,
- LifeCycleEvent.CLOSE);
-
- stateMachine.addTransition(LifeCycleState.QUASI_CLOSED,
- LifeCycleState.CLOSED,
- LifeCycleEvent.FORCE_CLOSE);
-
- stateMachine.addTransition(LifeCycleState.CLOSED,
- LifeCycleState.DELETING,
- LifeCycleEvent.DELETE);
-
- stateMachine.addTransition(LifeCycleState.DELETING,
- LifeCycleState.DELETED,
- LifeCycleEvent.CLEANUP);
- }
-
-
- void loadContainer(final ContainerInfo containerInfo) throws SCMException {
- containers.addContainer(containerInfo);
- containerCount.set(Long.max(
- containerInfo.getContainerID(), containerCount.get()));
- containerStateCount.incrementAndGet(containerInfo.getState());
- }
+ boolean contains(HddsProtos.ContainerID containerID);
/**
- * Allocates a new container based on the type, replication etc.
+ * Returns the ID of all the managed containers.
+ *
+ * @return Set of {@link ContainerID}
*/
- ContainerInfo allocateContainer(final PipelineManager pipelineManager,
- final ReplicationConfig replicationConfig, final String owner)
- throws IOException {
- final List pipelines = pipelineManager
- .getPipelines(replicationConfig, Pipeline.PipelineState.OPEN);
- Pipeline pipeline;
-
- boolean bgCreateOne = RatisReplicationConfig
- .hasFactor(replicationConfig, ReplicationFactor.ONE)
- && autoCreateRatisOne;
- boolean bgCreateThree = RatisReplicationConfig
- .hasFactor(replicationConfig, ReplicationFactor.THREE);
-
- if (!pipelines.isEmpty() && (bgCreateOne || bgCreateThree)) {
- // let background create Ratis pipelines.
- pipeline = pipelines.get((int) containerCount.get() % pipelines.size());
- } else {
- try {
- pipeline = pipelineManager.createPipeline(replicationConfig);
- pipelineManager.waitPipelineReady(pipeline.getId(), 0);
- } catch (IOException e) {
-
- if (pipelines.isEmpty()) {
- throw new IOException("Could not allocate container. Cannot get any" +
- " matching pipeline for replicationConfig:" + replicationConfig +
- ", State:PipelineState.OPEN");
- }
- pipeline = pipelines.get((int) containerCount.get() % pipelines.size());
- }
- }
-
- synchronized (pipeline) {
- return allocateContainer(pipelineManager, owner, pipeline);
- }
- }
+ Set getContainerIDs();
/**
- * Allocates a new container based on the type, replication etc.
- * This method should be called only after the lock on the pipeline is held
- * on which the container will be allocated.
*
- * @param pipelineManager - Pipeline Manager class.
- * @param owner - Owner of the container.
- * @param pipeline - Pipeline to which the container needs to be
- * allocated.
- * @return ContainerWithPipeline
- * @throws IOException on Failure.
*/
- ContainerInfo allocateContainer(
- final PipelineManager pipelineManager, final String owner,
- Pipeline pipeline) throws IOException {
- Preconditions.checkNotNull(pipeline,
- "Pipeline couldn't be found for the new container. "
- + "Do you have enough nodes?");
-
- final long containerID = containerCount.incrementAndGet();
- final ContainerInfo containerInfo = new ContainerInfo.Builder()
- .setState(LifeCycleState.OPEN)
- .setPipelineID(pipeline.getId())
- .setUsedBytes(0)
- .setNumberOfKeys(0)
- .setStateEnterTime(Time.now())
- .setOwner(owner)
- .setContainerID(containerID)
- .setDeleteTransactionId(0)
- .setReplicationConfig(pipeline.getReplicationConfig())
- .build();
- addContainerInfo(containerID, containerInfo, pipelineManager, pipeline);
- if (LOG.isTraceEnabled()) {
- LOG.trace("New container allocated: {}", containerInfo);
- }
- return containerInfo;
- }
-
- public void addContainerInfo(long containerID,
- ContainerInfo containerInfo,
- PipelineManager pipelineManager,
- Pipeline pipeline) throws IOException {
- Preconditions.checkNotNull(containerInfo);
- containers.addContainer(containerInfo);
- if (pipeline != null) {
- // In Recon, while adding a 'new' CLOSED container, pipeline will be a
- // random ID, and hence be passed down as null.
- pipelineManager.addContainerToPipeline(pipeline.getId(),
- ContainerID.valueOf(containerID));
- }
- containerStateCount.incrementAndGet(containerInfo.getState());
- }
+ Set getContainerIDs(LifeCycleState state);
/**
- * Update the Container State to the next state.
*
- * @param containerID - ContainerID
- * @param event - LifeCycle Event
- * @throws SCMException on Failure.
*/
- void updateContainerState(final ContainerID containerID,
- final HddsProtos.LifeCycleEvent event)
- throws SCMException, ContainerNotFoundException {
- final ContainerInfo info = containers.getContainerInfo(containerID);
- try {
- final LifeCycleState oldState = info.getState();
- final LifeCycleState newState = stateMachine.getNextState(
- info.getState(), event);
- containers.updateState(containerID, info.getState(), newState);
- containerStateCount.incrementAndGet(newState);
- containerStateCount.decrementAndGet(oldState);
- } catch (InvalidStateTransitionException ex) {
- String error = String.format("Failed to update container state %s, " +
- "reason: invalid state transition from state: %s upon " +
- "event: %s.",
- containerID, info.getState(), event);
- LOG.error(error);
- throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
- }
- }
+ ContainerInfo getContainer(HddsProtos.ContainerID id);
/**
- * Update deleteTransactionId for a container.
*
- * @param deleteTransactionMap maps containerId to its new
- * deleteTransactionID
*/
- void updateDeleteTransactionId(
- final Map deleteTransactionMap) {
- deleteTransactionMap.forEach((k, v) -> {
- containers.getContainerInfo(ContainerID.valueOf(k))
- .updateDeleteTransactionId(v);
- });
- }
-
+ Set getContainerReplicas(HddsProtos.ContainerID id);
/**
- * Return a container matching the attributes specified.
*
- * @param size - Space needed in the Container.
- * @param owner - Owner of the container - A specific nameservice.
- * @param pipelineID - ID of the pipeline
- * @param containerIDs - Set of containerIDs to choose from
- * @return ContainerInfo, null if there is no match found.
*/
- ContainerInfo getMatchingContainer(final long size, String owner,
- PipelineID pipelineID, NavigableSet containerIDs) {
- if (containerIDs.isEmpty()) {
- return null;
- }
-
- // Get the last used container and find container above the last used
- // container ID.
- final ContainerState key = new ContainerState(owner, pipelineID);
- final ContainerID lastID =
- lastUsedMap.getOrDefault(key, containerIDs.first());
-
- // There is a small issue here. The first time, we will skip the first
- // container. But in most cases it will not matter.
- NavigableSet resultSet = containerIDs.tailSet(lastID, false);
- if (resultSet.size() == 0) {
- resultSet = containerIDs;
- }
-
- ContainerInfo selectedContainer =
- findContainerWithSpace(size, resultSet, owner, pipelineID);
- if (selectedContainer == null) {
-
- // If we did not find any space in the tailSet, we need to look for
- // space in the headset, we need to pass true to deal with the
- // situation that we have a lone container that has space. That is we
- // ignored the last used container under the assumption we can find
- // other containers with space, but if have a single container that is
- // not true. Hence we need to include the last used container as the
- // last element in the sorted set.
-
- resultSet = containerIDs.headSet(lastID, true);
- selectedContainer =
- findContainerWithSpace(size, resultSet, owner, pipelineID);
- }
-
- return selectedContainer;
- }
-
- private ContainerInfo findContainerWithSpace(final long size,
- final NavigableSet searchSet, final String owner,
- final PipelineID pipelineID) {
- // Get the container with space to meet our request.
- for (ContainerID id : searchSet) {
- final ContainerInfo containerInfo = containers.getContainerInfo(id);
- if (containerInfo.getUsedBytes() + size <= this.containerSize) {
- containerInfo.updateLastUsedTime();
- return containerInfo;
- }
- }
- return null;
- }
-
- Set getAllContainerIDs() {
- return containers.getAllContainerIDs();
- }
+ void updateContainerReplica(HddsProtos.ContainerID id,
+ ContainerReplica replica);
/**
- * Returns Containers by State.
*
- * @param state - State - Open, Closed etc.
- * @return List of containers by state.
*/
- Set getContainerIDsByState(final LifeCycleState state) {
- return containers.getContainerIDsByState(state);
- }
+ void removeContainerReplica(HddsProtos.ContainerID id,
+ ContainerReplica replica);
/**
- * Get count of containers in the current {@link LifeCycleState}.
*
- * @param state {@link LifeCycleState}
- * @return Count of containers
*/
- Integer getContainerCountByState(final LifeCycleState state) {
- return Long.valueOf(containerStateCount.get(state)).intValue();
- }
+ @Replicate
+ void addContainer(ContainerInfoProto containerInfo)
+ throws IOException;
/**
- * Returns a set of ContainerIDs that match the Container.
*
- * @param owner Owner of the Containers.
- * @param repConfig - Replication Config of the containers
- * @param state - Current State, like Open, Close etc.
- * @return Set of containers that match the specific query parameters.
*/
- NavigableSet getMatchingContainerIDs(final String owner,
- final ReplicationConfig repConfig, final LifeCycleState state) {
- return containers.getMatchingContainerIDs(state, owner, repConfig);
- }
+ @Replicate
+ void updateContainerState(HddsProtos.ContainerID id,
+ HddsProtos.LifeCycleEvent event)
+ throws IOException, InvalidStateTransitionException;
/**
- * Returns the containerInfo for the given container id.
- * @param containerID id of the container
- * @return ContainerInfo containerInfo
- * @throws IOException
+ *
*/
- ContainerInfo getContainer(final ContainerID containerID)
- throws ContainerNotFoundException {
- final ContainerInfo container = containers.getContainerInfo(containerID);
- if (container != null) {
- return container;
- }
- throw new ContainerNotFoundException(containerID.toString());
- }
-
- void close() throws IOException {
- }
+ // Make this as @Replicate
+ void updateDeleteTransactionId(Map deleteTransactionMap)
+ throws IOException;
/**
- * Returns the latest list of DataNodes where replica for given containerId
- * exist. Throws an SCMException if no entry is found for given containerId.
*
- * @param containerID
- * @return Set
*/
- Set getContainerReplicas(
- final ContainerID containerID) throws ContainerNotFoundException {
- return containers.getContainerReplicas(containerID);
- }
+ ContainerInfo getMatchingContainer(long size, String owner,
+ PipelineID pipelineID,
+ NavigableSet containerIDs);
/**
- * Add a container Replica for given DataNode.
*
- * @param containerID
- * @param replica
*/
- void updateContainerReplica(final ContainerID containerID,
- final ContainerReplica replica) throws ContainerNotFoundException {
- containers.updateContainerReplica(containerID, replica);
- }
+ @Replicate
+ void removeContainer(HddsProtos.ContainerID containerInfo)
+ throws IOException;
/**
- * Remove a container Replica for given DataNode.
- *
- * @param containerID
- * @param replica
- * @return True of dataNode is removed successfully else false.
+ * Reinitialize the ContainerStateManager with container store.
+ * @param containerStore container table.
+ * @throws IOException
*/
- void removeContainerReplica(final ContainerID containerID,
- final ContainerReplica replica)
- throws ContainerNotFoundException, ContainerReplicaNotFoundException {
- containers.removeContainerReplica(containerID, replica);
- }
-
- void removeContainer(final ContainerID containerID)
- throws ContainerNotFoundException {
- if (containers.getContainerInfo(containerID) == null) {
- throw new ContainerNotFoundException(containerID.toString());
- }
- containers.removeContainer(containerID);
- }
+ void reinitialize(Table containerStore)
+ throws IOException;
/**
- * Update the lastUsedmap to update with ContainerState and containerID.
- * @param pipelineID
- * @param containerID
- * @param owner
+ *
*/
- public synchronized void updateLastUsedMap(PipelineID pipelineID,
- ContainerID containerID, String owner) {
- lastUsedMap.put(new ContainerState(owner, pipelineID),
- containerID);
- }
-
+ void close() throws IOException;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
index 2abc6d02bdc6..5c74575036ab 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -79,7 +79,7 @@
* This class is NOT thread safe. All the calls are idempotent.
*/
public final class ContainerStateManagerImpl
- implements ContainerStateManagerV2 {
+ implements ContainerStateManager {
/**
* Logger instance of ContainerStateManagerImpl.
@@ -561,21 +561,21 @@ public Builder setContainerStore(
return this;
}
- public ContainerStateManagerV2 build() throws IOException {
+ public ContainerStateManager build() throws IOException {
Preconditions.checkNotNull(conf);
Preconditions.checkNotNull(pipelineMgr);
Preconditions.checkNotNull(table);
- final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
+ final ContainerStateManager csm = new ContainerStateManagerImpl(
conf, pipelineMgr, table, transactionBuffer);
final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
scmRatisServer);
- return (ContainerStateManagerV2) Proxy.newProxyInstance(
+ return (ContainerStateManager) Proxy.newProxyInstance(
SCMHAInvocationHandler.class.getClassLoader(),
- new Class>[]{ContainerStateManagerV2.class}, invocationHandler);
+ new Class>[]{ContainerStateManager.class}, invocationHandler);
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
deleted file mode 100644
index 276e21c9b834..000000000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.container;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.scm.metadata.Replicate;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
-
-/**
- * A ContainerStateManager is responsible for keeping track of all the
- * container and its state inside SCM, it also exposes methods to read and
- * modify the container and its state.
- *
- * All the mutation operations are marked with {@link Replicate} annotation so
- * that when SCM-HA is enabled, the mutations are replicated from leader SCM
- * to the followers.
- *
- * When a method is marked with {@link Replicate} annotation it should follow
- * the below rules.
- *
- * 1. The method call should be Idempotent
- * 2. Arguments should be of protobuf objects
- * 3. Return type should be of protobuf object
- * 4. The declaration should throw RaftException
- *
- */
-public interface ContainerStateManagerV2 {
-
- //TODO: Rename this to ContainerStateManager
-
- /* **********************************************************************
- * Container Life Cycle *
- * *
- * Event and State Transition Mapping: *
- * *
- * State: OPEN ----------------> CLOSING *
- * Event: FINALIZE *
- * *
- * State: CLOSING ----------------> QUASI_CLOSED *
- * Event: QUASI_CLOSE *
- * *
- * State: CLOSING ----------------> CLOSED *
- * Event: CLOSE *
- * *
- * State: QUASI_CLOSED ----------------> CLOSED *
- * Event: FORCE_CLOSE *
- * *
- * State: CLOSED ----------------> DELETING *
- * Event: DELETE *
- * *
- * State: DELETING ----------------> DELETED *
- * Event: CLEANUP *
- * *
- * *
- * Container State Flow: *
- * *
- * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] *
- * (FINALIZE) | (QUASI_CLOSE) | *
- * | | *
- * | | *
- * (CLOSE) | (FORCE_CLOSE) | *
- * | | *
- * | | *
- * +--------->[CLOSED]<--------+ *
- * | *
- * (DELETE)| *
- * | *
- * | *
- * [DELETING] *
- * | *
- * (CLEANUP) | *
- * | *
- * V *
- * [DELETED] *
- * *
- ************************************************************************/
-
- /**
- *
- */
- boolean contains(HddsProtos.ContainerID containerID);
-
- /**
- * Returns the ID of all the managed containers.
- *
- * @return Set of {@link ContainerID}
- */
- Set getContainerIDs();
-
- /**
- *
- */
- Set getContainerIDs(LifeCycleState state);
-
- /**
- *
- */
- ContainerInfo getContainer(HddsProtos.ContainerID id);
-
- /**
- *
- */
- Set getContainerReplicas(HddsProtos.ContainerID id);
-
- /**
- *
- */
- void updateContainerReplica(HddsProtos.ContainerID id,
- ContainerReplica replica);
-
- /**
- *
- */
- void removeContainerReplica(HddsProtos.ContainerID id,
- ContainerReplica replica);
-
- /**
- *
- */
- @Replicate
- void addContainer(ContainerInfoProto containerInfo)
- throws IOException;
-
- /**
- *
- */
- @Replicate
- void updateContainerState(HddsProtos.ContainerID id,
- HddsProtos.LifeCycleEvent event)
- throws IOException, InvalidStateTransitionException;
-
- /**
- *
- */
- // Make this as @Replicate
- void updateDeleteTransactionId(Map deleteTransactionMap)
- throws IOException;
-
- /**
- *
- */
- ContainerInfo getMatchingContainer(long size, String owner,
- PipelineID pipelineID,
- NavigableSet containerIDs);
-
- /**
- *
- */
- @Replicate
- void removeContainer(HddsProtos.ContainerID containerInfo)
- throws IOException;
-
- /**
- * Reinitialize the ContainerStateManager with container store.
- * @param containerStore container table.
- * @throws IOException
- */
- void reinitialize(Table containerStore)
- throws IOException;
-
- /**
- *
- */
- void close() throws IOException;
-}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 31acfcf46c4a..068f3d774c22 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -92,6 +92,7 @@
public final class TestUtils {
private static ThreadLocalRandom random = ThreadLocalRandom.current();
+ private static PipelineID randomPipelineID = PipelineID.randomId();
private TestUtils() {
}
@@ -621,7 +622,7 @@ public static StorageContainerManager getScm(OzoneConfiguration conf,
return StorageContainerManager.createSCM(conf, configurator);
}
- public static ContainerInfo getContainer(
+ private static ContainerInfo.Builder getDefaultContainerInfoBuilder(
final HddsProtos.LifeCycleState state) {
return new ContainerInfo.Builder()
.setContainerID(RandomUtils.nextLong())
@@ -629,7 +630,20 @@ public static ContainerInfo getContainer(
new RatisReplicationConfig(ReplicationFactor.THREE))
.setState(state)
.setSequenceId(10000L)
- .setOwner("TEST")
+ .setOwner("TEST");
+ }
+
+ public static ContainerInfo getContainer(
+ final HddsProtos.LifeCycleState state) {
+ return getDefaultContainerInfoBuilder(state)
+ .setPipelineID(randomPipelineID)
+ .build();
+ }
+
+ public static ContainerInfo getContainer(
+ final HddsProtos.LifeCycleState state, PipelineID pipelineID) {
+ return getDefaultContainerInfoBuilder(state)
+ .setPipelineID(pipelineID)
.build();
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 2d92992d40b3..cbaf3bf79387 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -16,7 +16,9 @@
*/
package org.apache.hadoop.hdds.scm.container;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -25,24 +27,35 @@
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -59,28 +72,49 @@ public class TestContainerReportHandler {
private ContainerManager containerManager;
private ContainerStateManager containerStateManager;
private EventPublisher publisher;
+ private File testDir;
+ private DBStore dbStore;
+ private SCMHAManager scmhaManager;
+ private PipelineManager pipelineManager;
@Before
public void setup() throws IOException, InvalidStateTransitionException {
- final ConfigurationSource conf = new OzoneConfiguration();
- this.nodeManager = new MockNodeManager(true, 10);
- this.containerManager = Mockito.mock(ContainerManager.class);
- this.containerStateManager = new ContainerStateManager(conf);
- this.publisher = Mockito.mock(EventPublisher.class);
-
+ final OzoneConfiguration conf = SCMTestUtils.getConf();
+ nodeManager = new MockNodeManager(true, 10);
+ containerManager = Mockito.mock(ContainerManager.class);
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+ scmhaManager = MockSCMHAManager.getInstance(true);
+ nodeManager = new MockNodeManager(true, 10);
+ pipelineManager =
+ new MockPipelineManager(dbStore, scmhaManager, nodeManager);
+ containerStateManager = ContainerStateManagerImpl.newBuilder()
+ .setConfiguration(conf)
+ .setPipelineManager(pipelineManager)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
+ publisher = Mockito.mock(EventPublisher.class);
Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
- .getContainer((ContainerID)invocation.getArguments()[0]));
+ .getContainer(((ContainerID)invocation
+ .getArguments()[0]).getProtobuf()));
Mockito.when(containerManager.getContainerReplicas(
Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
- .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+ .getContainerReplicas(((ContainerID)invocation
+ .getArguments()[0]).getProtobuf()));
Mockito.doAnswer(invocation -> {
containerStateManager
- .updateContainerState((ContainerID)invocation.getArguments()[0],
+ .updateContainerState(((ContainerID)invocation
+ .getArguments()[0]).getProtobuf(),
(HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerState(
@@ -89,7 +123,7 @@ public void setup() throws IOException, InvalidStateTransitionException {
Mockito.doAnswer(invocation -> {
containerStateManager.updateContainerReplica(
- (ContainerID) invocation.getArguments()[0],
+ ((ContainerID)invocation.getArguments()[0]).getProtobuf(),
(ContainerReplica) invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerReplica(
@@ -97,7 +131,7 @@ public void setup() throws IOException, InvalidStateTransitionException {
Mockito.doAnswer(invocation -> {
containerStateManager.removeContainerReplica(
- (ContainerID) invocation.getArguments()[0],
+ ((ContainerID)invocation.getArguments()[0]).getProtobuf(),
(ContainerReplica) invocation.getArguments()[1]);
return null;
}).when(containerManager).removeContainerReplica(
@@ -106,14 +140,18 @@ public void setup() throws IOException, InvalidStateTransitionException {
}
@After
- public void tearDown() throws IOException {
+ public void tearDown() throws Exception {
containerStateManager.close();
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
}
@Test
public void testUnderReplicatedContainer()
- throws NodeNotFoundException, ContainerNotFoundException, SCMException {
-
+ throws NodeNotFoundException, IOException {
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, containerManager);
final Iterator nodeIterator = nodeManager.getNodes(
@@ -132,32 +170,20 @@ public void testUnderReplicatedContainer()
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
- containerStateManager.loadContainer(containerOne);
- containerStateManager.loadContainer(containerTwo);
+ containerStateManager.addContainer(containerOne.getProtobuf());
+ containerStateManager.addContainer(containerTwo.getProtobuf());
getReplicas(containerOne.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree)
- .forEach(r -> {
- try {
- containerStateManager.updateContainerReplica(
- containerOne.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ .forEach(r -> containerStateManager.updateContainerReplica(
+ containerOne.containerID().getProtobuf(), r));
getReplicas(containerTwo.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree)
- .forEach(r -> {
- try {
- containerStateManager.updateContainerReplica(
- containerTwo.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ .forEach(r -> containerStateManager.updateContainerReplica(
+ containerTwo.containerID().getProtobuf(), r));
// SCM expects both containerOne and containerTwo to be in all the three
@@ -180,7 +206,7 @@ public void testUnderReplicatedContainer()
@Test
public void testOverReplicatedContainer() throws NodeNotFoundException,
- SCMException, ContainerNotFoundException {
+ IOException {
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, containerManager);
@@ -203,32 +229,20 @@ public void testOverReplicatedContainer() throws NodeNotFoundException,
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
- containerStateManager.loadContainer(containerOne);
- containerStateManager.loadContainer(containerTwo);
+ containerStateManager.addContainer(containerOne.getProtobuf());
+ containerStateManager.addContainer(containerTwo.getProtobuf());
getReplicas(containerOne.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree)
- .forEach(r -> {
- try {
- containerStateManager.updateContainerReplica(
- containerOne.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ .forEach(r -> containerStateManager.updateContainerReplica(
+ containerOne.containerID().getProtobuf(), r));
getReplicas(containerTwo.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree)
- .forEach(r -> {
- try {
- containerStateManager.updateContainerReplica(
- containerTwo.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ .forEach(r -> containerStateManager.updateContainerReplica(
+ containerTwo.containerID().getProtobuf(), r));
// SCM expects both containerOne and containerTwo to be in all the three
@@ -296,26 +310,16 @@ public void testClosingToClosed() throws NodeNotFoundException, IOException {
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
- containerStateManager.loadContainer(containerOne);
- containerStateManager.loadContainer(containerTwo);
+ containerStateManager.addContainer(containerOne.getProtobuf());
+ containerStateManager.addContainer(containerTwo.getProtobuf());
- containerOneReplicas.forEach(r -> {
- try {
+ containerOneReplicas.forEach(r ->
containerStateManager.updateContainerReplica(
- containerTwo.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ containerTwo.containerID().getProtobuf(), r));
- containerTwoReplicas.forEach(r -> {
- try {
+ containerTwoReplicas.forEach(r ->
containerStateManager.updateContainerReplica(
- containerTwo.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ containerTwo.containerID().getProtobuf(), r));
final ContainerReportsProto containerReport = getContainerReportsProto(
@@ -325,7 +329,8 @@ public void testClosingToClosed() throws NodeNotFoundException, IOException {
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
- Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
+ Assert.assertEquals(LifeCycleState.CLOSED,
+ containerManager.getContainer(containerOne.containerID()).getState());
}
@Test
@@ -373,26 +378,16 @@ public void testClosingToQuasiClosed()
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
- containerStateManager.loadContainer(containerOne);
- containerStateManager.loadContainer(containerTwo);
+ containerStateManager.addContainer(containerOne.getProtobuf());
+ containerStateManager.addContainer(containerTwo.getProtobuf());
- containerOneReplicas.forEach(r -> {
- try {
+ containerOneReplicas.forEach(r ->
containerStateManager.updateContainerReplica(
- containerTwo.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ containerTwo.containerID().getProtobuf(), r));
- containerTwoReplicas.forEach(r -> {
- try {
+ containerTwoReplicas.forEach(r ->
containerStateManager.updateContainerReplica(
- containerTwo.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ containerTwo.containerID().getProtobuf(), r));
final ContainerReportsProto containerReport = getContainerReportsProto(
@@ -402,7 +397,8 @@ public void testClosingToQuasiClosed()
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
- Assert.assertEquals(LifeCycleState.QUASI_CLOSED, containerOne.getState());
+ Assert.assertEquals(LifeCycleState.QUASI_CLOSED,
+ containerManager.getContainer(containerOne.containerID()).getState());
}
@Test
@@ -453,26 +449,16 @@ public void testQuasiClosedToClosed()
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
- containerStateManager.loadContainer(containerOne);
- containerStateManager.loadContainer(containerTwo);
+ containerStateManager.addContainer(containerOne.getProtobuf());
+ containerStateManager.addContainer(containerTwo.getProtobuf());
- containerOneReplicas.forEach(r -> {
- try {
+ containerOneReplicas.forEach(r ->
containerStateManager.updateContainerReplica(
- containerTwo.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
+ containerTwo.containerID().getProtobuf(), r));
- }
- });
-
- containerTwoReplicas.forEach(r -> {
- try {
+ containerTwoReplicas.forEach(r ->
containerStateManager.updateContainerReplica(
- containerTwo.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ containerTwo.containerID().getProtobuf(), r));
final ContainerReportsProto containerReport = getContainerReportsProto(
@@ -483,38 +469,47 @@ public void testQuasiClosedToClosed()
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
- Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
+ Assert.assertEquals(LifeCycleState.CLOSED,
+ containerManager.getContainer(containerOne.containerID()).getState());
}
@Test
public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
- throws SCMException {
+ throws IOException {
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, containerManager);
final Iterator nodeIterator = nodeManager.getNodes(
NodeStatus.inServiceHealthy()).iterator();
+ Pipeline pipeline = pipelineManager.createPipeline(
+ new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE));
+
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
final ContainerReplicaProto.State replicaState
= ContainerReplicaProto.State.OPEN;
- final ContainerInfo containerOne = getContainer(LifeCycleState.OPEN);
+ final ContainerInfo containerOne =
+ getContainer(LifeCycleState.OPEN, pipeline.getId());
- containerStateManager.loadContainer(containerOne);
+ containerStateManager.addContainer(containerOne.getProtobuf());
// Container loaded, no replicas reported from DNs. Expect zeros for
// usage values.
- assertEquals(0L, containerOne.getUsedBytes());
- assertEquals(0L, containerOne.getNumberOfKeys());
+ assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(0L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeOne, 50L, 60L), publisher);
// Single replica reported - ensure values are updated
- assertEquals(50L, containerOne.getUsedBytes());
- assertEquals(60L, containerOne.getNumberOfKeys());
+ assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
@@ -524,8 +519,10 @@ public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
datanodeThree, 50L, 60L), publisher);
// All 3 DNs are reporting the same values. Counts should be as expected.
- assertEquals(50L, containerOne.getUsedBytes());
- assertEquals(60L, containerOne.getNumberOfKeys());
+ assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
// Now each DN reports a different lesser value. Counts should be the min
// reported.
@@ -541,8 +538,10 @@ public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
// All 3 DNs are reporting different values. The actual value should be the
// minimum.
- assertEquals(1L, containerOne.getUsedBytes());
- assertEquals(10L, containerOne.getNumberOfKeys());
+ assertEquals(1L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(10L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
// Have the lowest value report a higher value and ensure the new value
// is the minimum
@@ -550,13 +549,15 @@ public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
containerOne.containerID(), replicaState,
datanodeOne, 3L, 12L), publisher);
- assertEquals(2L, containerOne.getUsedBytes());
- assertEquals(11L, containerOne.getNumberOfKeys());
+ assertEquals(2L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(11L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
}
@Test
public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas()
- throws SCMException {
+ throws IOException {
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, containerManager);
final Iterator nodeIterator = nodeManager.getNodes(
@@ -570,7 +571,7 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas()
= ContainerReplicaProto.State.CLOSED;
final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);
- containerStateManager.loadContainer(containerOne);
+ containerStateManager.addContainer(containerOne.getProtobuf());
// Container loaded, no replicas reported from DNs. Expect zeros for
// usage values.
assertEquals(0L, containerOne.getUsedBytes());
@@ -581,8 +582,10 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas()
datanodeOne, 50L, 60L), publisher);
// Single replica reported - ensure values are updated
- assertEquals(50L, containerOne.getUsedBytes());
- assertEquals(60L, containerOne.getNumberOfKeys());
+ assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
@@ -592,8 +595,10 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas()
datanodeThree, 50L, 60L), publisher);
// All 3 DNs are reporting the same values. Counts should be as expected.
- assertEquals(50L, containerOne.getUsedBytes());
- assertEquals(60L, containerOne.getNumberOfKeys());
+ assertEquals(50L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(60L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
// Now each DN reports a different lesser value. Counts should be the max
// reported.
@@ -609,8 +614,10 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas()
// All 3 DNs are reporting different values. The actual value should be the
// maximum.
- assertEquals(3L, containerOne.getUsedBytes());
- assertEquals(12L, containerOne.getNumberOfKeys());
+ assertEquals(3L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(12L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
// Have the highest value report a lower value and ensure the new value
// is the new maximumu
@@ -618,13 +625,15 @@ public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas()
containerOne.containerID(), replicaState,
datanodeThree, 1L, 10L), publisher);
- assertEquals(2L, containerOne.getUsedBytes());
- assertEquals(11L, containerOne.getNumberOfKeys());
+ assertEquals(2L, containerManager.getContainer(containerOne.containerID())
+ .getUsedBytes());
+ assertEquals(11L, containerManager.getContainer(containerOne.containerID())
+ .getNumberOfKeys());
}
@Test
public void testStaleReplicaOfDeletedContainer() throws NodeNotFoundException,
- SCMException, ContainerNotFoundException {
+ IOException {
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, containerManager);
@@ -639,7 +648,7 @@ public void testStaleReplicaOfDeletedContainer() throws NodeNotFoundException,
.collect(Collectors.toSet());
nodeManager.setContainers(datanodeOne, containerIDSet);
- containerStateManager.loadContainer(containerOne);
+ containerStateManager.addContainer(containerOne.getProtobuf());
// Expects the replica will be deleted.
final ContainerReportsProto containerReport = getContainerReportsProto(
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index 11c18a401fb1..1bd07f794341 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -17,21 +17,34 @@
package org.apache.hadoop.hdds.scm.container;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.util.Time;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -45,12 +58,51 @@
public class TestContainerStateManager {
private ContainerStateManager containerStateManager;
+ private PipelineManager pipelineManager;
+ private SCMHAManager scmhaManager;
+ private File testDir;
+ private DBStore dbStore;
+ private Pipeline pipeline;
@Before
public void init() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
- containerStateManager = new ContainerStateManager(conf);
+ scmhaManager = MockSCMHAManager.getInstance(true);
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+ pipelineManager = Mockito.mock(PipelineManager.class);
+ pipeline = Pipeline.newBuilder().setState(Pipeline.PipelineState.CLOSED)
+ .setId(PipelineID.randomId())
+ .setReplicationConfig(new StandaloneReplicationConfig(
+ ReplicationFactor.THREE))
+ .setNodes(new ArrayList<>()).build();
+ when(pipelineManager.createPipeline(new StandaloneReplicationConfig(
+ ReplicationFactor.THREE))).thenReturn(pipeline);
+ when(pipelineManager.containsPipeline(Mockito.any(PipelineID.class)))
+ .thenReturn(true);
+
+
+ containerStateManager = ContainerStateManagerImpl.newBuilder()
+ .setConfiguration(conf)
+ .setPipelineManager(pipelineManager)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
+
+ }
+ @After
+ public void tearDown() throws Exception {
+ containerStateManager.close();
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
}
@Test
@@ -68,7 +120,7 @@ public void checkReplicationStateOK() throws IOException {
//WHEN
Set replicas = containerStateManager
- .getContainerReplicas(c1.containerID());
+ .getContainerReplicas(c1.containerID().getProtobuf());
//THEN
Assert.assertEquals(3, replicas.size());
@@ -88,41 +140,38 @@ public void checkReplicationStateMissingReplica() throws IOException {
//WHEN
Set replicas = containerStateManager
- .getContainerReplicas(c1.containerID());
+ .getContainerReplicas(c1.containerID().getProtobuf());
Assert.assertEquals(2, replicas.size());
Assert.assertEquals(3, c1.getReplicationConfig().getRequiredNodes());
}
- private void addReplica(ContainerInfo cont, DatanodeDetails node)
- throws ContainerNotFoundException {
+ private void addReplica(ContainerInfo cont, DatanodeDetails node) {
ContainerReplica replica = ContainerReplica.newBuilder()
.setContainerID(cont.containerID())
.setContainerState(ContainerReplicaProto.State.CLOSED)
.setDatanodeDetails(node)
.build();
containerStateManager
- .updateContainerReplica(cont.containerID(), replica);
+ .updateContainerReplica(cont.containerID().getProtobuf(), replica);
}
private ContainerInfo allocateContainer() throws IOException {
- PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
-
- Pipeline pipeline =
- Pipeline.newBuilder().setState(Pipeline.PipelineState.CLOSED)
- .setId(PipelineID.randomId())
- .setReplicationConfig(new StandaloneReplicationConfig(
- ReplicationFactor.THREE))
- .setNodes(new ArrayList<>()).build();
-
- when(pipelineManager.createPipeline(new StandaloneReplicationConfig(
- ReplicationFactor.THREE))).thenReturn(pipeline);
-
- return containerStateManager.allocateContainer(pipelineManager,
- new StandaloneReplicationConfig(
- ReplicationFactor.THREE), "root");
+ final ContainerInfo containerInfo = new ContainerInfo.Builder()
+ .setState(HddsProtos.LifeCycleState.OPEN)
+ .setPipelineID(pipeline.getId())
+ .setUsedBytes(0)
+ .setNumberOfKeys(0)
+ .setStateEnterTime(Time.now())
+ .setOwner("root")
+ .setContainerID(1)
+ .setDeleteTransactionId(0)
+ .setReplicationConfig(pipeline.getReplicationConfig())
+ .build();
+ containerStateManager.addContainer(containerInfo.getProtobuf());
+ return containerInfo;
}
}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
index 0aa8e5f62ece..e8d4b38650ea 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -27,12 +28,17 @@
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.IncrementalContainerReportFromDatanode;
@@ -40,6 +46,8 @@
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.After;
@@ -48,6 +56,7 @@
import org.junit.Test;
import org.mockito.Mockito;
+import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -75,6 +84,10 @@ public class TestIncrementalContainerReportHandler {
private EventPublisher publisher;
private HDDSLayoutVersionManager versionManager;
private SCMContext scmContext = SCMContext.emptyContext();
+ private PipelineManager pipelineManager;
+ private File testDir;
+ private DBStore dbStore;
+ private SCMHAManager scmhaManager;
@Before
public void setup() throws IOException, InvalidStateTransitionException {
@@ -96,22 +109,40 @@ public void setup() throws IOException, InvalidStateTransitionException {
this.nodeManager =
new SCMNodeManager(conf, storageConfig, eventQueue, clusterMap,
scmContext, versionManager);
+ scmhaManager = MockSCMHAManager.getInstance(true);
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+
+ pipelineManager =
+ new MockPipelineManager(dbStore, scmhaManager, nodeManager);
+
+ this.containerStateManager = ContainerStateManagerImpl.newBuilder()
+ .setConfiguration(conf)
+ .setPipelineManager(pipelineManager)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
- this.containerStateManager = new ContainerStateManager(conf);
this.publisher = Mockito.mock(EventPublisher.class);
Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
- .getContainer((ContainerID)invocation.getArguments()[0]));
+ .getContainer(((ContainerID)invocation
+ .getArguments()[0]).getProtobuf()));
Mockito.when(containerManager.getContainerReplicas(
Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
- .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+ .getContainerReplicas(((ContainerID)invocation
+ .getArguments()[0]).getProtobuf()));
Mockito.doAnswer(invocation -> {
containerStateManager
- .removeContainerReplica((ContainerID)invocation.getArguments()[0],
+ .removeContainerReplica(((ContainerID)invocation
+ .getArguments()[0]).getProtobuf(),
(ContainerReplica)invocation.getArguments()[1]);
return null;
}).when(containerManager).removeContainerReplica(
@@ -120,7 +151,8 @@ public void setup() throws IOException, InvalidStateTransitionException {
Mockito.doAnswer(invocation -> {
containerStateManager
- .updateContainerState((ContainerID)invocation.getArguments()[0],
+ .updateContainerState(((ContainerID)invocation
+ .getArguments()[0]).getProtobuf(),
(HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerState(
@@ -129,7 +161,8 @@ public void setup() throws IOException, InvalidStateTransitionException {
Mockito.doAnswer(invocation -> {
containerStateManager
- .updateContainerReplica((ContainerID)invocation.getArguments()[0],
+ .updateContainerReplica(((ContainerID)invocation
+ .getArguments()[0]).getProtobuf(),
(ContainerReplica) invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerReplica(
@@ -139,8 +172,13 @@ public void setup() throws IOException, InvalidStateTransitionException {
}
@After
- public void tearDown() throws IOException {
+ public void tearDown() throws Exception {
containerStateManager.close();
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
}
@@ -161,15 +199,9 @@ public void testClosingToClosed() throws IOException {
ContainerReplicaProto.State.CLOSING,
datanodeOne, datanodeTwo, datanodeThree);
- containerStateManager.loadContainer(container);
- containerReplicas.forEach(r -> {
- try {
- containerStateManager.updateContainerReplica(
- container.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ containerStateManager.addContainer(container.getProtobuf());
+ containerReplicas.forEach(r -> containerStateManager.updateContainerReplica(
+ container.containerID().getProtobuf(), r));
final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
@@ -179,7 +211,8 @@ public void testClosingToClosed() throws IOException {
new IncrementalContainerReportFromDatanode(
datanodeOne, containerReport);
reportHandler.onMessage(icrFromDatanode, publisher);
- Assert.assertEquals(LifeCycleState.CLOSED, container.getState());
+ Assert.assertEquals(LifeCycleState.CLOSED,
+ containerManager.getContainer(container.containerID()).getState());
}
@Test
@@ -199,15 +232,9 @@ public void testClosingToQuasiClosed() throws IOException {
ContainerReplicaProto.State.CLOSING,
datanodeOne, datanodeTwo, datanodeThree);
- containerStateManager.loadContainer(container);
- containerReplicas.forEach(r -> {
- try {
- containerStateManager.updateContainerReplica(
- container.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ containerStateManager.addContainer(container.getProtobuf());
+ containerReplicas.forEach(r -> containerStateManager.updateContainerReplica(
+ container.containerID().getProtobuf(), r));
final IncrementalContainerReportProto containerReport =
@@ -218,7 +245,8 @@ public void testClosingToQuasiClosed() throws IOException {
new IncrementalContainerReportFromDatanode(
datanodeOne, containerReport);
reportHandler.onMessage(icrFromDatanode, publisher);
- Assert.assertEquals(LifeCycleState.QUASI_CLOSED, container.getState());
+ Assert.assertEquals(LifeCycleState.QUASI_CLOSED,
+ containerManager.getContainer(container.containerID()).getState());
}
@Test
@@ -242,15 +270,9 @@ public void testQuasiClosedToClosed() throws IOException {
ContainerReplicaProto.State.QUASI_CLOSED,
datanodeThree));
- containerStateManager.loadContainer(container);
- containerReplicas.forEach(r -> {
- try {
- containerStateManager.updateContainerReplica(
- container.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ containerStateManager.addContainer(container.getProtobuf());
+ containerReplicas.forEach(r -> containerStateManager.updateContainerReplica(
+ container.containerID().getProtobuf(), r));
final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
@@ -260,7 +282,8 @@ public void testQuasiClosedToClosed() throws IOException {
new IncrementalContainerReportFromDatanode(
datanodeOne, containerReport);
reportHandler.onMessage(icr, publisher);
- Assert.assertEquals(LifeCycleState.CLOSED, container.getState());
+ Assert.assertEquals(LifeCycleState.CLOSED,
+ containerManager.getContainer(container.containerID()).getState());
}
@Test
@@ -280,17 +303,11 @@ public void testDeleteContainer() throws IOException {
CLOSED,
datanodeOne, datanodeTwo, datanodeThree);
- containerStateManager.loadContainer(container);
- containerReplicas.forEach(r -> {
- try {
- containerStateManager.updateContainerReplica(
- container.containerID(), r);
- } catch (ContainerNotFoundException ignored) {
-
- }
- });
+ containerStateManager.addContainer(container.getProtobuf());
+ containerReplicas.forEach(r -> containerStateManager.updateContainerReplica(
+ container.containerID().getProtobuf(), r));
Assert.assertEquals(3, containerStateManager
- .getContainerReplicas(container.containerID()).size());
+ .getContainerReplicas(container.containerID().getProtobuf()).size());
final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
ContainerReplicaProto.State.DELETED,
@@ -300,7 +317,7 @@ public void testDeleteContainer() throws IOException {
datanodeOne, containerReport);
reportHandler.onMessage(icr, publisher);
Assert.assertEquals(2, containerStateManager
- .getContainerReplicas(container.containerID()).size());
+ .getContainerReplicas(container.containerID().getProtobuf()).size());
}
@Test
@@ -319,8 +336,8 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException,
final DatanodeDetails datanode = randomDatanodeDetails();
nodeManager.register(datanode, null, null);
- containerStateManager.loadContainer(container);
- containerStateManager.loadContainer(containerTwo);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.addContainer(containerTwo.getProtobuf());
Assert.assertEquals(0, nodeManager.getContainers(datanode).size());
@@ -359,7 +376,8 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException,
// If we find "container" in the NM, then we must also have it in
// Container Manager.
Assert.assertEquals(1, containerStateManager
- .getContainerReplicas(container.containerID()).size());
+ .getContainerReplicas(container.containerID().getProtobuf())
+ .size());
Assert.assertEquals(2, nmContainers.size());
} else {
// If the race condition occurs as mentioned in HDDS-5249, then this
@@ -367,11 +385,13 @@ public void testICRFCRRace() throws IOException, NodeNotFoundException,
// NM, but have found something for it in ContainerManager, and that
// should not happen. It should be in both, or neither.
Assert.assertEquals(0, containerStateManager
- .getContainerReplicas(container.containerID()).size());
+ .getContainerReplicas(container.containerID().getProtobuf())
+ .size());
Assert.assertEquals(1, nmContainers.size());
}
Assert.assertEquals(1, containerStateManager
- .getContainerReplicas(containerTwo.containerID()).size());
+ .getContainerReplicas(containerTwo.containerID().getProtobuf())
+ .size());
}
} finally {
executor.shutdown();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index b4dc1fe745ab..1fd06b6ee034 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -20,11 +20,13 @@
import com.google.common.primitives.Longs;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.protocol.proto
@@ -36,13 +38,14 @@
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -50,6 +53,7 @@
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
@@ -90,6 +94,7 @@
import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.mockito.Mockito.when;
/**
* Test cases to verify the functionality of ReplicationManager.
@@ -108,10 +113,13 @@ public class TestReplicationManager {
private TestClock clock;
private File testDir;
private DBStore dbStore;
+ private PipelineManager pipelineManager;
+ private SCMHAManager scmhaManager;
@Before
public void setup()
- throws IOException, InterruptedException, NodeNotFoundException {
+ throws IOException, InterruptedException,
+ NodeNotFoundException, InvalidStateTransitionException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
@@ -121,7 +129,22 @@ public void setup()
containerManager = Mockito.mock(ContainerManager.class);
nodeManager = new SimpleMockNodeManager();
eventQueue = new EventQueue();
- containerStateManager = new ContainerStateManager(conf);
+ scmhaManager = MockSCMHAManager.getInstance(true);
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+ pipelineManager = Mockito.mock(PipelineManager.class);
+ when(pipelineManager.containsPipeline(Mockito.any(PipelineID.class)))
+ .thenReturn(true);
+ containerStateManager = ContainerStateManagerImpl.newBuilder()
+ .setConfiguration(conf)
+ .setPipelineManager(pipelineManager)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
serviceManager = new SCMServiceManager();
datanodeCommandHandler = new DatanodeCommandHandler();
@@ -129,29 +152,31 @@ public void setup()
Mockito.when(containerManager.getContainers())
.thenAnswer(invocation -> {
- Set ids = containerStateManager.getAllContainerIDs();
+ Set ids = containerStateManager.getContainerIDs();
List containers = new ArrayList<>();
for (ContainerID id : ids) {
- containers.add(containerStateManager.getContainer(id));
+ containers.add(containerStateManager.getContainer(
+ id.getProtobuf()));
}
return containers;
});
Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
- .getContainer((ContainerID)invocation.getArguments()[0]));
+ .getContainer(((ContainerID)invocation
+ .getArguments()[0]).getProtobuf()));
Mockito.when(containerManager.getContainerReplicas(
Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
- .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
+ .getContainerReplicas(((ContainerID)invocation
+ .getArguments()[0]).getProtobuf()));
containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
Mockito.when(containerPlacementPolicy.chooseDatanodes(
- Mockito.any(),
- Mockito.any(),
- Mockito.anyInt(), Mockito.anyLong(), Mockito.anyLong()))
+ Mockito.any(), Mockito.any(), Mockito.anyInt(),
+ Mockito.anyLong(), Mockito.anyLong()))
.thenAnswer(invocation -> {
int count = (int) invocation.getArguments()[2];
return IntStream.range(0, count)
@@ -202,6 +227,15 @@ private void createReplicationManager(ReplicationManagerConfiguration rmConf)
Thread.sleep(100L);
}
+ @After
+ public void tearDown() throws Exception {
+ containerStateManager.close();
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
+ }
/**
* Checks if restarting of replication manager works.
@@ -224,9 +258,9 @@ public void testReplicationManagerRestart() throws InterruptedException {
* any action on OPEN containers.
*/
@Test
- public void testOpenContainer() throws SCMException, InterruptedException {
+ public void testOpenContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.OPEN);
- containerStateManager.loadContainer(container);
+ containerStateManager.addContainer(container.getProtobuf());
replicationManager.processAll();
eventQueue.processAll(1000);
Assert.assertEquals(0, datanodeCommandHandler.getInvocation());
@@ -238,12 +272,11 @@ public void testOpenContainer() throws SCMException, InterruptedException {
* to all the datanodes.
*/
@Test
- public void testClosingContainer() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ public void testClosingContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final ContainerID id = container.containerID();
- containerStateManager.loadContainer(container);
+ containerStateManager.addContainer(container.getProtobuf());
// Two replicas in CLOSING state
final Set replicas = getReplicas(id, State.CLOSING,
@@ -255,7 +288,7 @@ public void testClosingContainer() throws
replicas.addAll(getReplicas(id, State.OPEN, datanode));
for (ContainerReplica replica : replicas) {
- containerStateManager.updateContainerReplica(id, replica);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
}
final int currentCloseCommandCount = datanodeCommandHandler
@@ -268,7 +301,7 @@ public void testClosingContainer() throws
// Update the OPEN to CLOSING
for (ContainerReplica replica : getReplicas(id, State.CLOSING, datanode)) {
- containerStateManager.updateContainerReplica(id, replica);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
}
replicationManager.processAll();
@@ -284,8 +317,7 @@ public void testClosingContainer() throws
* datanodes.
*/
@Test
- public void testQuasiClosedContainerWithTwoOpenReplica() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ public void testQuasiClosedContainerWithTwoOpenReplica() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
@@ -297,10 +329,11 @@ public void testQuasiClosedContainerWithTwoOpenReplica() throws
final ContainerReplica replicaThree = getReplicas(
id, State.OPEN, 1000L, datanodeDetails.getUuid(), datanodeDetails);
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(id, replicaThree);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicaThree);
final int currentCloseCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
@@ -323,8 +356,7 @@ public void testQuasiClosedContainerWithTwoOpenReplica() throws
* the container, ReplicationManager will not do anything.
*/
@Test
- public void testHealthyQuasiClosedContainer() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ public void testHealthyQuasiClosedContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
@@ -335,10 +367,11 @@ public void testHealthyQuasiClosedContainer() throws
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(id, replicaThree);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicaThree);
// All the QUASI_CLOSED replicas have same originNodeId, so the
// container will not be closed. ReplicationManager should take no action.
@@ -358,8 +391,7 @@ public void testHealthyQuasiClosedContainer() throws
*/
@Test
public void testQuasiClosedContainerWithUnhealthyReplica()
- throws SCMException, ContainerNotFoundException, InterruptedException,
- ContainerReplicaNotFoundException {
+ throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
container.setUsedBytes(100);
final ContainerID id = container.containerID();
@@ -371,10 +403,11 @@ public void testQuasiClosedContainerWithUnhealthyReplica()
final ContainerReplica replicaThree = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(id, replicaThree);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicaThree);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
@@ -391,7 +424,8 @@ public void testQuasiClosedContainerWithUnhealthyReplica()
final ContainerReplica unhealthyReplica = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId,
replicaOne.getDatanodeDetails());
- containerStateManager.updateContainerReplica(id, unhealthyReplica);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), unhealthyReplica);
replicationManager.processAll();
eventQueue.processAll(1000);
@@ -404,7 +438,7 @@ public void testQuasiClosedContainerWithUnhealthyReplica()
replicationManager.getMetrics().getNumDeletionCmdsSent());
// Now we will delete the unhealthy replica from in-memory.
- containerStateManager.removeContainerReplica(id, replicaOne);
+ containerStateManager.removeContainerReplica(id.getProtobuf(), replicaOne);
final long currentBytesToReplicate = replicationManager.getMetrics()
.getNumReplicationBytesTotal();
@@ -430,7 +464,8 @@ public void testQuasiClosedContainerWithUnhealthyReplica()
.get(id).get(0).getDatanode();
final ContainerReplica replicatedReplicaOne = getReplicas(
id, State.CLOSED, 1000L, originNodeId, targetDn);
- containerStateManager.updateContainerReplica(id, replicatedReplicaOne);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicatedReplicaOne);
final long currentReplicationCommandCompleted = replicationManager
.getMetrics().getNumReplicationCmdsCompleted();
@@ -454,8 +489,7 @@ public void testQuasiClosedContainerWithUnhealthyReplica()
* deletes the excess replicas.
*/
@Test
- public void testOverReplicatedQuasiClosedContainer() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ public void testOverReplicatedQuasiClosedContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
@@ -468,11 +502,12 @@ public void testOverReplicatedQuasiClosedContainer() throws
final ContainerReplica replicaFour = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(id, replicaThree);
- containerStateManager.updateContainerReplica(id, replicaFour);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicaThree);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaFour);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
@@ -491,13 +526,17 @@ public void testOverReplicatedQuasiClosedContainer() throws
DatanodeDetails targetDn = replicationManager.getInflightDeletion()
.get(id).get(0).getDatanode();
if (targetDn.equals(replicaOne.getDatanodeDetails())) {
- containerStateManager.removeContainerReplica(id, replicaOne);
+ containerStateManager.removeContainerReplica(
+ id.getProtobuf(), replicaOne);
} else if (targetDn.equals(replicaTwo.getDatanodeDetails())) {
- containerStateManager.removeContainerReplica(id, replicaTwo);
+ containerStateManager.removeContainerReplica(
+ id.getProtobuf(), replicaTwo);
} else if (targetDn.equals(replicaThree.getDatanodeDetails())) {
- containerStateManager.removeContainerReplica(id, replicaThree);
+ containerStateManager.removeContainerReplica(
+ id.getProtobuf(), replicaThree);
} else if (targetDn.equals(replicaFour.getDatanodeDetails())) {
- containerStateManager.removeContainerReplica(id, replicaFour);
+ containerStateManager.removeContainerReplica(
+ id.getProtobuf(), replicaFour);
}
final long currentDeleteCommandCompleted = replicationManager.getMetrics()
@@ -520,7 +559,7 @@ public void testOverReplicatedQuasiClosedContainer() throws
*/
@Test
public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
- throws SCMException, ContainerNotFoundException, InterruptedException {
+ throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
@@ -533,11 +572,12 @@ public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
final ContainerReplica replicaFour = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(id, replicaThree);
- containerStateManager.updateContainerReplica(id, replicaFour);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicaThree);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaFour);
final int currentDeleteCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.deleteContainerCommand);
@@ -558,7 +598,7 @@ public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
final long currentDeleteCommandCompleted = replicationManager.getMetrics()
.getNumDeletionCmdsCompleted();
// Now we remove the replica to simulate deletion complete
- containerStateManager.removeContainerReplica(id, replicaOne);
+ containerStateManager.removeContainerReplica(id.getProtobuf(), replicaOne);
replicationManager.processAll();
eventQueue.processAll(1000);
@@ -575,8 +615,7 @@ public void testOverReplicatedQuasiClosedContainerWithUnhealthyReplica()
* under replicated.
*/
@Test
- public void testUnderReplicatedQuasiClosedContainer() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ public void testUnderReplicatedQuasiClosedContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
container.setUsedBytes(100);
final ContainerID id = container.containerID();
@@ -586,9 +625,9 @@ public void testUnderReplicatedQuasiClosedContainer() throws
final ContainerReplica replicaTwo = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
@@ -618,7 +657,8 @@ public void testUnderReplicatedQuasiClosedContainer() throws
.get(id).get(0).getDatanode();
final ContainerReplica replicatedReplicaThree = getReplicas(
id, State.CLOSED, 1000L, originNodeId, targetDn);
- containerStateManager.updateContainerReplica(id, replicatedReplicaThree);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicatedReplicaThree);
replicationManager.processAll();
eventQueue.processAll(1000);
@@ -653,8 +693,8 @@ public void testUnderReplicatedQuasiClosedContainer() throws
*/
@Test
public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
- throws SCMException, ContainerNotFoundException, InterruptedException,
- ContainerReplicaNotFoundException, TimeoutException {
+ throws IOException, InterruptedException,
+ TimeoutException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
@@ -663,9 +703,9 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
final ContainerReplica replicaTwo = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
@@ -690,7 +730,7 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
replicateCommand.get().getDatanodeId());
ContainerReplica newReplica = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, newNode);
- containerStateManager.updateContainerReplica(id, newReplica);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), newReplica);
/*
* We have report the replica to SCM, in the next ReplicationManager
@@ -711,7 +751,7 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
Assert.assertEquals(1, replicationManager.getMetrics()
.getInflightDeletion());
- containerStateManager.removeContainerReplica(id, replicaTwo);
+ containerStateManager.removeContainerReplica(id.getProtobuf(), replicaTwo);
final long currentDeleteCommandCompleted = replicationManager.getMetrics()
.getNumDeletionCmdsCompleted();
@@ -748,17 +788,16 @@ public void testUnderReplicatedQuasiClosedContainerWithUnhealthyReplica()
* highest BCSID.
*/
@Test
- public void testQuasiClosedToClosed() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ public void testQuasiClosedToClosed() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerID id = container.containerID();
final Set replicas = getReplicas(id, State.QUASI_CLOSED,
randomDatanodeDetails(),
randomDatanodeDetails(),
randomDatanodeDetails());
- containerStateManager.loadContainer(container);
+ containerStateManager.addContainer(container.getProtobuf());
for (ContainerReplica replica : replicas) {
- containerStateManager.updateContainerReplica(id, replica);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
}
final int currentCloseCommandCount = datanodeCommandHandler
@@ -779,8 +818,7 @@ public void testQuasiClosedToClosed() throws
* CLOSED and healthy.
*/
@Test
- public void testHealthyClosedContainer()
- throws SCMException, ContainerNotFoundException, InterruptedException {
+ public void testHealthyClosedContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final Set replicas = getReplicas(id, State.CLOSED,
@@ -788,9 +826,9 @@ public void testHealthyClosedContainer()
randomDatanodeDetails(),
randomDatanodeDetails());
- containerStateManager.loadContainer(container);
+ containerStateManager.addContainer(container.getProtobuf());
for (ContainerReplica replica : replicas) {
- containerStateManager.updateContainerReplica(id, replica);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
}
replicationManager.processAll();
@@ -802,8 +840,7 @@ public void testHealthyClosedContainer()
* ReplicationManager should close the unhealthy OPEN container.
*/
@Test
- public void testUnhealthyOpenContainer()
- throws SCMException, ContainerNotFoundException, InterruptedException {
+ public void testUnhealthyOpenContainer() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.OPEN);
final ContainerID id = container.containerID();
final Set replicas = getReplicas(id, State.OPEN,
@@ -811,9 +848,9 @@ public void testUnhealthyOpenContainer()
randomDatanodeDetails());
replicas.addAll(getReplicas(id, State.UNHEALTHY, randomDatanodeDetails()));
- containerStateManager.loadContainer(container);
+ containerStateManager.addContainer(container.getProtobuf());
for (ContainerReplica replica : replicas) {
- containerStateManager.updateContainerReplica(id, replica);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
}
final CloseContainerEventHandler closeContainerHandler =
@@ -830,8 +867,7 @@ public void testUnhealthyOpenContainer()
* ReplicationManager should skip send close command to unhealthy replica.
*/
@Test
- public void testCloseUnhealthyReplica()
- throws SCMException, ContainerNotFoundException, InterruptedException {
+ public void testCloseUnhealthyReplica() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final ContainerID id = container.containerID();
final Set replicas = getReplicas(id, State.UNHEALTHY,
@@ -839,9 +875,9 @@ public void testCloseUnhealthyReplica()
replicas.addAll(getReplicas(id, State.OPEN, randomDatanodeDetails()));
replicas.addAll(getReplicas(id, State.OPEN, randomDatanodeDetails()));
- containerStateManager.loadContainer(container);
+ containerStateManager.addContainer(container.getProtobuf());
for (ContainerReplica replica : replicas) {
- containerStateManager.updateContainerReplica(id, replica);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replica);
}
replicationManager.processAll();
@@ -863,8 +899,7 @@ public void testGeneratedConfig() {
}
@Test
- public void additionalReplicaScheduledWhenMisReplicated()
- throws SCMException, ContainerNotFoundException, InterruptedException {
+ public void additionalReplicaScheduledWhenMisReplicated() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
container.setUsedBytes(100);
final ContainerID id = container.containerID();
@@ -876,10 +911,11 @@ public void additionalReplicaScheduledWhenMisReplicated()
final ContainerReplica replicaThree = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(id, replicaThree);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicaThree);
// Ensure a mis-replicated status is returned for any containers in this
// test where there are 3 replicas. When there are 2 or 4 replicas
@@ -939,8 +975,7 @@ public void additionalReplicaScheduledWhenMisReplicated()
}
@Test
- public void overReplicatedButRemovingMakesMisReplicated()
- throws SCMException, ContainerNotFoundException, InterruptedException {
+ public void overReplicatedButRemovingMakesMisReplicated() throws IOException {
// In this test, the excess replica should not be removed.
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
@@ -956,12 +991,13 @@ public void overReplicatedButRemovingMakesMisReplicated()
final ContainerReplica replicaFive = getReplicas(
id, State.UNHEALTHY, 1000L, originNodeId, randomDatanodeDetails());
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(id, replicaThree);
- containerStateManager.updateContainerReplica(id, replicaFour);
- containerStateManager.updateContainerReplica(id, replicaFive);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicaThree);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaFour);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaFive);
// Ensure a mis-replicated status is returned for any containers in this
// test where there are exactly 3 replicas checked.
@@ -993,8 +1029,7 @@ public void overReplicatedButRemovingMakesMisReplicated()
}
@Test
- public void testOverReplicatedAndPolicySatisfied() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ public void testOverReplicatedAndPolicySatisfied() throws IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
@@ -1007,11 +1042,12 @@ public void testOverReplicatedAndPolicySatisfied() throws
final ContainerReplica replicaFour = getReplicas(
id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(id, replicaThree);
- containerStateManager.updateContainerReplica(id, replicaFour);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicaThree);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaFour);
Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.argThat(list -> list.size() == 3),
@@ -1035,7 +1071,7 @@ public void testOverReplicatedAndPolicySatisfied() throws
@Test
public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
- SCMException {
+ IOException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
@@ -1050,12 +1086,13 @@ public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
final ContainerReplica replicaFive = getReplicas(
id, State.QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
- containerStateManager.loadContainer(container);
- containerStateManager.updateContainerReplica(id, replicaOne);
- containerStateManager.updateContainerReplica(id, replicaTwo);
- containerStateManager.updateContainerReplica(id, replicaThree);
- containerStateManager.updateContainerReplica(id, replicaFour);
- containerStateManager.updateContainerReplica(id, replicaFive);
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaOne);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaTwo);
+ containerStateManager.updateContainerReplica(
+ id.getProtobuf(), replicaThree);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaFour);
+ containerStateManager.updateContainerReplica(id.getProtobuf(), replicaFive);
Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.argThat(list -> list != null && list.size() <= 4),
@@ -1082,8 +1119,7 @@ public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
* decommissioned replicas.
*/
@Test
- public void testUnderReplicatedDueToDecommission() throws
- SCMException {
+ public void testUnderReplicatedDueToDecommission() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
@@ -1096,8 +1132,7 @@ public void testUnderReplicatedDueToDecommission() throws
* are decommissioning.
*/
@Test
- public void testUnderReplicatedDueToAllDecommission() throws
- SCMException {
+ public void testUnderReplicatedDueToAllDecommission() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
@@ -1110,8 +1145,7 @@ public void testUnderReplicatedDueToAllDecommission() throws
* correctly replicated with decommissioned replicas still present.
*/
@Test
- public void testCorrectlyReplicatedWithDecommission() throws
- SCMException {
+ public void testCorrectlyReplicatedWithDecommission() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1125,8 +1159,7 @@ public void testCorrectlyReplicatedWithDecommission() throws
* is not met for maintenance.
*/
@Test
- public void testUnderReplicatedDueToMaintenance() throws
- SCMException {
+ public void testUnderReplicatedDueToMaintenance() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
@@ -1179,8 +1212,7 @@ public void testUnderReplicatedDueToMaintenanceMinRepOne()
* are going into maintenance.
*/
@Test
- public void testUnderReplicatedDueToAllMaintenance() throws
- SCMException {
+ public void testUnderReplicatedDueToAllMaintenance() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
@@ -1193,8 +1225,7 @@ public void testUnderReplicatedDueToAllMaintenance() throws
* replica are available.
*/
@Test
- public void testCorrectlyReplicatedWithMaintenance() throws
- SCMException {
+ public void testCorrectlyReplicatedWithMaintenance() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1208,8 +1239,8 @@ public void testCorrectlyReplicatedWithMaintenance() throws
* are decommissioning or maintenance.
*/
@Test
- public void testUnderReplicatedWithDecommissionAndMaintenance() throws
- SCMException {
+ public void testUnderReplicatedWithDecommissionAndMaintenance()
+ throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
@@ -1226,7 +1257,7 @@ public void testUnderReplicatedWithDecommissionAndMaintenance() throws
*/
@Test
public void testOverReplicatedClosedContainerWithDecomAndMaint()
- throws SCMException {
+ throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
@@ -1251,7 +1282,8 @@ public void testOverReplicatedClosedContainerWithDecomAndMaint()
// Get the DECOM and Maint replica and ensure none of them are scheduled
// for removal
Set decom =
- containerStateManager.getContainerReplicas(container.containerID())
+ containerStateManager.getContainerReplicas(
+ container.containerID().getProtobuf())
.stream()
.filter(r -> r.getDatanodeDetails().getPersistedOpState() != IN_SERVICE)
.collect(Collectors.toSet());
@@ -1269,8 +1301,7 @@ public void testOverReplicatedClosedContainerWithDecomAndMaint()
* scheduled.
*/
@Test
- public void testUnderReplicatedNotHealthySource()
- throws SCMException {
+ public void testUnderReplicatedNotHealthySource() throws IOException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, NodeStatus.inServiceStale(), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED);
@@ -1284,7 +1315,7 @@ public void testUnderReplicatedNotHealthySource()
* if all the prerequisites are satisfied, move should work as expected.
*/
@Test
- public void testMove() throws SCMException, NodeNotFoundException,
+ public void testMove() throws IOException, NodeNotFoundException,
InterruptedException, ExecutionException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
ContainerID id = container.containerID();
@@ -1315,7 +1346,7 @@ public void testMove() throws SCMException, NodeNotFoundException,
SCMCommandProto.Type.deleteContainerCommand, dn1.getDatanodeDetails()));
Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.deleteContainerCommand));
- containerStateManager.removeContainerReplica(id, dn1);
+ containerStateManager.removeContainerReplica(id.getProtobuf(), dn1);
replicationManager.processAll();
eventQueue.processAll(1000);
@@ -1398,10 +1429,10 @@ public void testMoveCrashAndRestart() throws IOException,
//deleteContainerCommand will be sent again
Assert.assertEquals(2, datanodeCommandHandler.getInvocationCount(
SCMCommandProto.Type.deleteContainerCommand));
- containerStateManager.removeContainerReplica(id, dn1);
+ containerStateManager.removeContainerReplica(id.getProtobuf(), dn1);
//replica in src datanode is deleted now
- containerStateManager.removeContainerReplica(id, dn1);
+ containerStateManager.removeContainerReplica(id.getProtobuf(), dn1);
replicationManager.processAll();
eventQueue.processAll(1000);
@@ -1422,7 +1453,7 @@ public void testMoveCrashAndRestart() throws IOException,
*/
@Test
public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
- throws SCMException, NodeNotFoundException,
+ throws IOException, NodeNotFoundException,
InterruptedException, ExecutionException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
ContainerID id = container.containerID();
@@ -1449,7 +1480,7 @@ public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
//now, replication succeeds, but replica in dn2 lost,
//and there are only tree replicas totally, so rm should
//not delete the replica on dn1
- containerStateManager.removeContainerReplica(id, dn2);
+ containerStateManager.removeContainerReplica(id.getProtobuf(), dn2);
replicationManager.processAll();
eventQueue.processAll(1000);
@@ -1464,7 +1495,7 @@ public void testMoveNotDeleteSrcIfPolicyNotSatisfied()
* test src and target datanode become unhealthy when moving.
*/
@Test
- public void testDnBecameUnhealthyWhenMoving() throws SCMException,
+ public void testDnBecameUnhealthyWhenMoving() throws IOException,
NodeNotFoundException, InterruptedException, ExecutionException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
ContainerID id = container.containerID();
@@ -1508,11 +1539,11 @@ public void testDnBecameUnhealthyWhenMoving() throws SCMException,
* some Prerequisites should be satisfied.
*/
@Test
- public void testMovePrerequisites()
- throws SCMException, NodeNotFoundException,
- InterruptedException, ExecutionException {
+ public void testMovePrerequisites() throws IOException, NodeNotFoundException,
+ InterruptedException, ExecutionException,
+ InvalidStateTransitionException {
//all conditions is met
- final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+ final ContainerInfo container = createContainer(LifeCycleState.OPEN);
ContainerID id = container.containerID();
ContainerReplica dn1 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1536,17 +1567,31 @@ public void testMovePrerequisites()
replicationManager.start();
Thread.sleep(100L);
- //container in not in CLOSED state
- for (LifeCycleState state : LifeCycleState.values()) {
- if (state != LifeCycleState.CLOSED) {
- container.setState(state);
- cf = replicationManager.move(id,
- new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
- Assert.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
- }
- }
- container.setState(LifeCycleState.CLOSED);
+ //container in not in OPEN state
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+ //open -> closing
+ containerStateManager.updateContainerState(id.getProtobuf(),
+ LifeCycleEvent.FINALIZE);
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+ //closing -> quasi_closed
+ containerStateManager.updateContainerState(id.getProtobuf(),
+ LifeCycleEvent.QUASI_CLOSE);
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ Assert.assertTrue(cf.isDone() && cf.get() ==
+ MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
+
+ //quasi_closed -> closed
+ containerStateManager.updateContainerState(id.getProtobuf(),
+ LifeCycleEvent.FORCE_CLOSE);
+ Assert.assertTrue(LifeCycleState.CLOSED ==
+ containerStateManager.getContainer(id.getProtobuf()).getState());
//Node is not in healthy state
for (HddsProtos.NodeState state : HddsProtos.NodeState.values()) {
@@ -1610,8 +1655,8 @@ public void testMovePrerequisites()
//make the replica num be 2 to test the case
//that container is in inflightReplication
- containerStateManager.removeContainerReplica(id, dn5);
- containerStateManager.removeContainerReplica(id, dn4);
+ containerStateManager.removeContainerReplica(id.getProtobuf(), dn5);
+ containerStateManager.removeContainerReplica(id.getProtobuf(), dn4);
//replication manager should generate inflightReplication
replicationManager.processAll();
//waiting for inflightReplication generation
@@ -1623,8 +1668,7 @@ public void testMovePrerequisites()
}
@Test
- public void testReplicateCommandTimeout() throws
- SCMException, InterruptedException {
+ public void testReplicateCommandTimeout() throws IOException {
long timeout = new ReplicationManagerConfiguration().getEventTimeout();
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
@@ -1645,7 +1689,7 @@ public void testReplicateCommandTimeout() throws
@Test
public void testDeleteCommandTimeout() throws
- SCMException, InterruptedException {
+ IOException, InterruptedException {
long timeout = new ReplicationManagerConfiguration().getEventTimeout();
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
@@ -1667,9 +1711,9 @@ public void testDeleteCommandTimeout() throws
}
private ContainerInfo createContainer(LifeCycleState containerState)
- throws SCMException {
+ throws IOException {
final ContainerInfo container = getContainer(containerState);
- containerStateManager.loadContainer(container);
+ containerStateManager.addContainer(container.getProtobuf());
return container;
}
@@ -1707,7 +1751,7 @@ private ContainerReplica addReplicaToDn(ContainerInfo container,
final ContainerReplica replica = getReplicas(
container.containerID(), replicaState, 1000L, originNodeId, dn);
containerStateManager
- .updateContainerReplica(container.containerID(), replica);
+ .updateContainerReplica(container.containerID().getProtobuf(), replica);
return replica;
}
@@ -1741,7 +1785,9 @@ private void assertDeleteScheduled(int delta) throws InterruptedException {
public void teardown() throws Exception {
containerStateManager.close();
replicationManager.stop();
- dbStore.close();
+ if (dbStore != null) {
+ dbStore.close();
+ }
FileUtils.deleteDirectory(testDir);
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java
index 0be7c27336a0..3bb926324658 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestUnknownContainerReport.java
@@ -20,10 +20,13 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
+import java.util.UUID;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -33,13 +36,22 @@
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -55,13 +67,31 @@ public class TestUnknownContainerReport {
private ContainerManager containerManager;
private ContainerStateManager containerStateManager;
private EventPublisher publisher;
+ private PipelineManager pipelineManager;
+ private File testDir;
+ private DBStore dbStore;
+ private SCMHAManager scmhaManager;
@Before
public void setup() throws IOException {
- final ConfigurationSource conf = new OzoneConfiguration();
+ final OzoneConfiguration conf = SCMTestUtils.getConf();
this.nodeManager = new MockNodeManager(true, 10);
this.containerManager = Mockito.mock(ContainerManager.class);
- this.containerStateManager = new ContainerStateManager(conf);
+ testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
+ scmhaManager = MockSCMHAManager.getInstance(true);
+ pipelineManager =
+ new MockPipelineManager(dbStore, scmhaManager, nodeManager);
+ containerStateManager = ContainerStateManagerImpl.newBuilder()
+ .setConfiguration(conf)
+ .setPipelineManager(pipelineManager)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setContainerStore(SCMDBDefinition.CONTAINERS.getTable(dbStore))
+ .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .build();
this.publisher = Mockito.mock(EventPublisher.class);
Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
@@ -69,12 +99,17 @@ public void setup() throws IOException {
}
@After
- public void tearDown() throws IOException {
+ public void tearDown() throws Exception {
containerStateManager.close();
+ if (dbStore != null) {
+ dbStore.close();
+ }
+
+ FileUtil.fullyDelete(testDir);
}
@Test
- public void testUnknownContainerNotDeleted() throws IOException {
+ public void testUnknownContainerNotDeleted() {
OzoneConfiguration conf = new OzoneConfiguration();
sendContainerReport(conf);
@@ -85,7 +120,7 @@ public void testUnknownContainerNotDeleted() throws IOException {
}
@Test
- public void testUnknownContainerDeleted() throws IOException {
+ public void testUnknownContainerDeleted() {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(
ScmConfig.HDDS_SCM_UNKNOWN_CONTAINER_ACTION,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
index 0fa6c9b74d90..18d46c6e519f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestReplicationAnnotation.java
@@ -20,7 +20,7 @@
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.ratis.grpc.GrpcTlsConfig;
@@ -105,10 +105,10 @@ public void testReplicateAnnotationBasic() throws Throwable {
scmhaInvocationHandler = new SCMHAInvocationHandler(
RequestType.CONTAINER, null, scmRatisServer);
- ContainerStateManagerV2 proxy =
- (ContainerStateManagerV2) Proxy.newProxyInstance(
+ ContainerStateManager proxy =
+ (ContainerStateManager) Proxy.newProxyInstance(
SCMHAInvocationHandler.class.getClassLoader(),
- new Class>[]{ContainerStateManagerV2.class}, scmhaInvocationHandler);
+ new Class>[]{ContainerStateManager.class}, scmhaInvocationHandler);
try {
proxy.addContainer(HddsProtos.ContainerInfoProto.getDefaultInstance());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index 018aadae18d2..6d4a1f4b38c8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -64,7 +64,7 @@ public class TestContainerStateManagerIntegration {
private MiniOzoneCluster cluster;
private StorageContainerManager scm;
private ContainerManager containerManager;
- private ContainerStateManagerV2 containerStateManager;
+ private ContainerStateManager containerStateManager;
private int numContainerPerOwnerInPipeline;
private Set excludedContainerIDS;