Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
*/
public class ContainerManagerImpl implements ContainerManagerV2 {

/*
* TODO: Introduce container level locks.
*/

/**
*
*/
Expand All @@ -72,17 +76,18 @@ public class ContainerManagerImpl implements ContainerManagerV2 {
*
*/
public ContainerManagerImpl(
// Introduce builder for this class?
final Configuration conf, final PipelineManager pipelineManager,
final SCMHAManager scmhaManager,
final Configuration conf,
final SCMHAManager scmHaManager,
final PipelineManager pipelineManager,
final Table<ContainerID, ContainerInfo> containerStore)
throws IOException {
// Introduce builder for this class?
this.lock = new ReentrantReadWriteLock();
this.pipelineManager = pipelineManager;
this.containerStateManager = ContainerStateManagerImpl.newBuilder()
this.containerStateManager = ContainerStateManagerImpl.newBuilder()
.setConfiguration(conf)
.setPipelineManager(pipelineManager)
.setRatisServer(scmhaManager.getRatisServer())
.setRatisServer(scmHaManager.getRatisServer())
.setContainerStore(containerStore)
.build();
}
Expand Down Expand Up @@ -275,8 +280,8 @@ public void notifyContainerReportProcessing(final boolean isFullReport,
}

@Override
public void close() throws IOException {
throw new UnsupportedOperationException("Not yet implemented!");
public void close() throws Exception {
containerStateManager.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hdds.scm.container;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand All @@ -36,8 +35,8 @@
* mapping. This is used by SCM when allocating new locations and when
* looking up a key.
*/
public interface ContainerManagerV2 extends Closeable {

public interface ContainerManagerV2 extends AutoCloseable {
// TODO: Rename this to ContainerManager

/**
* Returns all the container Ids managed by ContainerManager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hadoop.hdds.scm.container;

import java.io.IOException;

import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -27,128 +26,92 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TODO: Add javadoc.
* Default implementation of ContainerStateManager. This implementation
* holds the Container States in-memory which is backed by a persistent store.
* The persistent store is always kept in sync with the in-memory state changes.
*/
public final class ContainerStateManagerImpl
implements ContainerStateManagerV2 {

/* **********************************************************************
* Container Life Cycle *
* *
* Event and State Transition Mapping: *
* *
* State: OPEN ----------------> CLOSING *
* Event: FINALIZE *
* *
* State: CLOSING ----------------> QUASI_CLOSED *
* Event: QUASI_CLOSE *
* *
* State: CLOSING ----------------> CLOSED *
* Event: CLOSE *
* *
* State: QUASI_CLOSED ----------------> CLOSED *
* Event: FORCE_CLOSE *
* *
* State: CLOSED ----------------> DELETING *
* Event: DELETE *
* *
* State: DELETING ----------------> DELETED *
* Event: CLEANUP *
* *
* *
* Container State Flow: *
* *
* [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] *
* (FINALIZE) | (QUASI_CLOSE) | *
* | | *
* | | *
* (CLOSE) | (FORCE_CLOSE) | *
* | | *
* | | *
* +--------->[CLOSED]<--------+ *
* | *
* (DELETE)| *
* | *
* | *
* [DELETING] *
* | *
* (CLEANUP) | *
* | *
* V *
* [DELETED] *
* *
************************************************************************/

/**
*
* Logger instance of ContainerStateManagerImpl.
*/
private static final Logger LOG = LoggerFactory.getLogger(
ContainerStateManagerImpl.class);

/**
*
* Configured container size.
*/
private final long containerSize;

/**
*
* The container ID sequence which is used to create new container.
* This will be removed once we have a Distributed Sequence ID Generator.
*/
@Deprecated
private final AtomicLong nextContainerID;

/**
*
* In-memory representation of Container States.
*/
private final ContainerStateMap containers;

/**
*
* Persistent store for Container States.
*/
private final PipelineManager pipelineManager;
private Table<ContainerID, ContainerInfo> containerStore;

/**
*
* PipelineManager instance.
*/
private Table<ContainerID, ContainerInfo> containerStore;
private final PipelineManager pipelineManager;

/**
*
* Container lifecycle state machine.
*/
private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine;

/**
*
* We use the containers in round-robin fashion for operations like block
* allocation. This map is used for remembering the last used container.
*/
private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;

/**
* constructs ContainerStateManagerImpl instance and loads the containers
* form the persistent storage.
*
* @param conf the Configuration
* @param pipelineManager the {@link PipelineManager} instance
* @param containerStore the persistent storage
* @throws IOException in case of error while loading the containers
*/
private ContainerStateManagerImpl(final Configuration conf,
final PipelineManager pipelineManager,
Expand All @@ -158,15 +121,17 @@ private ContainerStateManagerImpl(final Configuration conf,
this.containerStore = containerStore;
this.stateMachine = newStateMachine();
this.containerSize = getConfiguredContainerSize(conf);
this.nextContainerID = new AtomicLong();
this.nextContainerID = new AtomicLong(1L);
this.containers = new ContainerStateMap();
this.lastUsedMap = new ConcurrentHashMap<>();

initialize();
}

/**
* Creates and initializes a new Container Lifecycle StateMachine.
*
* @return the Container Lifecycle StateMachine
*/
private StateMachine<LifeCycleState, LifeCycleEvent> newStateMachine() {

Expand Down Expand Up @@ -208,7 +173,9 @@ private StateMachine<LifeCycleState, LifeCycleEvent> newStateMachine() {
}

/**
* Returns the configured container size.
*
* @return the max size of container
*/
private long getConfiguredContainerSize(final Configuration conf) {
return (long) conf.getStorageSize(
Expand All @@ -218,7 +185,9 @@ private long getConfiguredContainerSize(final Configuration conf) {
}

/**
* Loads the containers from container store into memory.
*
* @throws IOException in case of error while loading the containers
*/
private void initialize() throws IOException {
TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>>
Expand Down Expand Up @@ -282,16 +251,20 @@ public void addContainer(final ContainerInfoProto containerInfo)

Preconditions.checkNotNull(containerInfo);
final ContainerInfo container = ContainerInfo.fromProtobuf(containerInfo);
if (getContainer(container.containerID()) == null) {
Preconditions.checkArgument(nextContainerID.get()
== container.containerID().getId(),
"ContainerID mismatch.");

pipelineManager.addContainerToPipeline(
container.getPipelineID(), container.containerID());
containers.addContainer(container);
nextContainerID.incrementAndGet();
}
final ContainerID containerID = container.containerID();
final PipelineID pipelineID = container.getPipelineID();

/*
* TODO:
* Check if the container already exist in in ContainerStateManager.
* This optimization can be done after moving ContainerNotFoundException
* from ContainerStateMap to ContainerManagerImpl.
*/

containerStore.put(containerID, container);
containers.addContainer(container);
pipelineManager.addContainerToPipeline(pipelineID, containerID);
nextContainerID.incrementAndGet();
}

void updateContainerState(final ContainerID containerID,
Expand Down Expand Up @@ -337,7 +310,9 @@ void removeContainer(final ContainerID containerID)
throw new UnsupportedOperationException("Not yet implemented!");
}

void close() throws IOException {
@Override
public void close() throws Exception {
containerStore.close();
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -382,7 +357,6 @@ public ContainerStateManagerV2 build() throws IOException {

final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
conf, pipelineMgr, table);
scmRatisServer.registerStateMachineHandler(RequestType.CONTAINER, csm);

final SCMHAInvocationHandler invocationHandler =
new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
Expand Down
Loading