Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public class ContainerStateManager {
private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
private final ContainerStateMap containers;
private final AtomicLong containerCount;
private final int numContainerPerOwnerInPipeline;

/**
* Constructs a Container State Manager that tracks all containers owned by
Expand Down Expand Up @@ -152,9 +151,6 @@ public ContainerStateManager(final Configuration configuration) {
this.lastUsedMap = new ConcurrentHashMap<>();
this.containerCount = new AtomicLong(0);
this.containers = new ContainerStateMap();
this.numContainerPerOwnerInPipeline = configuration
.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
}

/*
Expand Down Expand Up @@ -362,55 +358,6 @@ void updateDeleteTransactionId(
});
}

/**
* Return a container matching the attributes specified.
*
* @param size - Space needed in the Container.
* @param owner - Owner of the container - A specific nameservice.
* @param pipelineManager - Pipeline Manager
* @param pipeline - Pipeline from which container needs to be matched
* @return ContainerInfo, null if there is no match found.
*/
ContainerInfo getMatchingContainer(final long size, String owner,
PipelineManager pipelineManager, Pipeline pipeline) throws IOException {

NavigableSet<ContainerID> containerIDs =
pipelineManager.getContainersInPipeline(pipeline.getId());
if (containerIDs == null) {
LOG.error("Container list is null for pipeline=", pipeline.getId());
return null;
}

getContainers(containerIDs, owner);
if (containerIDs.size() < numContainerPerOwnerInPipeline) {
synchronized (pipeline) {
// TODO: #CLUTIL Maybe we can add selection logic inside synchronized
// as well
containerIDs = getContainers(
pipelineManager.getContainersInPipeline(pipeline.getId()), owner);
if (containerIDs.size() < numContainerPerOwnerInPipeline) {
ContainerInfo containerInfo =
allocateContainer(pipelineManager, owner, pipeline);
lastUsedMap.put(new ContainerState(owner, pipeline.getId()),
containerInfo.containerID());
return containerInfo;
}
}
}

ContainerInfo containerInfo =
getMatchingContainer(size, owner, pipeline.getId(), containerIDs);
if (containerInfo == null) {
synchronized (pipeline) {
containerInfo =
allocateContainer(pipelineManager, owner, pipeline);
lastUsedMap.put(new ContainerState(owner, pipeline.getId()),
containerInfo.containerID());
}
}
// TODO: #CLUTIL cleanup entries in lastUsedMap
return containerInfo;
}

/**
* Return a container matching the attributes specified.
Expand Down Expand Up @@ -469,9 +416,6 @@ private ContainerInfo findContainerWithSpace(final long size,
final ContainerInfo containerInfo = containers.getContainerInfo(id);
if (containerInfo.getUsedBytes() + size <= this.containerSize) {
containerInfo.updateLastUsedTime();

final ContainerState key = new ContainerState(owner, pipelineID);
lastUsedMap.put(key, containerInfo.containerID());
return containerInfo;
}
}
Expand Down Expand Up @@ -523,21 +467,7 @@ ContainerInfo getContainer(final ContainerID containerID)
return containers.getContainerInfo(containerID);
}

private NavigableSet<ContainerID> getContainers(
NavigableSet<ContainerID> containerIDs, String owner) {
for (ContainerID cid : containerIDs) {
try {
if (!getContainer(cid).getOwner().equals(owner)) {
containerIDs.remove(cid);
}
} catch (ContainerNotFoundException e) {
LOG.error("Could not find container info for container id={} {}", cid,
e);
containerIDs.remove(cid);
}
}
return containerIDs;
}


void close() throws IOException {
}
Expand Down Expand Up @@ -583,4 +513,16 @@ void removeContainer(final ContainerID containerID)
containers.removeContainer(containerID);
}

/**
* Update the lastUsedmap to update with ContainerState and containerID.
* @param pipelineID
* @param containerID
* @param owner
*/
public synchronized void updateLastUsedMap(PipelineID pipelineID,
ContainerID containerID, String owner) {
lastUsedMap.put(new ContainerState(owner, pipelineID),
containerID);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand All @@ -44,6 +45,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.Lock;
Expand All @@ -67,6 +69,7 @@ public class SCMContainerManager implements ContainerManager {
private final MetadataStore containerStore;
private final PipelineManager pipelineManager;
private final ContainerStateManager containerStateManager;
private final int numContainerPerOwnerInPipeline;

/**
* Constructs a mapping class that creates mapping between container names
Expand Down Expand Up @@ -100,6 +103,9 @@ public SCMContainerManager(final Configuration conf,
this.lock = new ReentrantLock();
this.pipelineManager = pipelineManager;
this.containerStateManager = new ContainerStateManager(conf);
this.numContainerPerOwnerInPipeline = conf
.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);

loadExistingContainers();
}
Expand Down Expand Up @@ -201,6 +207,7 @@ public List<ContainerInfo> listContainer(ContainerID startContainerID,
}
}


/**
* Allocates a new container.
*
Expand All @@ -215,23 +222,11 @@ public ContainerInfo allocateContainer(final ReplicationType type,
throws IOException {
lock.lock();
try {
final ContainerInfo containerInfo; containerInfo = containerStateManager
.allocateContainer(pipelineManager, type, replicationFactor, owner);
try {
final byte[] containerIDBytes = Longs.toByteArray(
containerInfo.getContainerID());
containerStore.put(containerIDBytes,
containerInfo.getProtobuf().toByteArray());
} catch (IOException ex) {
// If adding to containerStore fails, we should remove the container
// from in-memory map.
try {
containerStateManager.removeContainer(containerInfo.containerID());
} catch (ContainerNotFoundException cnfe) {
// No need to worry much, everything is going as planned.
}
throw ex;
}
final ContainerInfo containerInfo =
containerStateManager.allocateContainer(pipelineManager, type,
replicationFactor, owner);
// Add container to DB.
addContainerToDB(containerInfo);
return containerInfo;
} finally {
lock.unlock();
Expand Down Expand Up @@ -360,16 +355,101 @@ public ContainerInfo getMatchingContainer(final long sizeRequired,
String owner, Pipeline pipeline) {
try {
//TODO: #CLUTIL See if lock is required here
return containerStateManager
.getMatchingContainer(sizeRequired, owner, pipelineManager,
pipeline);
NavigableSet<ContainerID> containerIDs =
pipelineManager.getContainersInPipeline(pipeline.getId());

containerIDs = getContainersForOwner(containerIDs, owner);
if (containerIDs.size() < numContainerPerOwnerInPipeline) {
synchronized (pipeline) {
// TODO: #CLUTIL Maybe we can add selection logic inside synchronized
// as well
containerIDs = getContainersForOwner(
pipelineManager.getContainersInPipeline(pipeline.getId()), owner);
if (containerIDs.size() < numContainerPerOwnerInPipeline) {
ContainerInfo containerInfo =
containerStateManager.allocateContainer(pipelineManager, owner,
pipeline);
// Add to DB
addContainerToDB(containerInfo);
containerStateManager.updateLastUsedMap(pipeline.getId(),
containerInfo.containerID(), owner);
return containerInfo;
}
}
}

ContainerInfo containerInfo =
containerStateManager.getMatchingContainer(sizeRequired, owner,
pipeline.getId(), containerIDs);
if (containerInfo == null) {
synchronized (pipeline) {
containerInfo =
containerStateManager.allocateContainer(pipelineManager, owner,
pipeline);
// Add to DB
addContainerToDB(containerInfo);
}
}
containerStateManager.updateLastUsedMap(pipeline.getId(),
containerInfo.containerID(), owner);
// TODO: #CLUTIL cleanup entries in lastUsedMap
return containerInfo;
} catch (Exception e) {
LOG.warn("Container allocation failed for pipeline={} requiredSize={} {}",
pipeline, sizeRequired, e);
return null;
}
}

/**
* Add newly allocated container to container DB.
* @param containerInfo
* @throws IOException
*/
private void addContainerToDB(ContainerInfo containerInfo)
throws IOException {
try {
final byte[] containerIDBytes = Longs.toByteArray(
containerInfo.getContainerID());
containerStore.put(containerIDBytes,
containerInfo.getProtobuf().toByteArray());
} catch (IOException ex) {
// If adding to containerStore fails, we should remove the container
// from in-memory map.
try {
containerStateManager.removeContainer(containerInfo.containerID());
} catch (ContainerNotFoundException cnfe) {
// This should not happen, as we are removing after adding in to
// container state cmap.
}
throw ex;
}
}

/**
* Returns the container ID's matching with specified owner.
* @param containerIDs
* @param owner
* @return NavigableSet<ContainerID>
*/
private NavigableSet<ContainerID> getContainersForOwner(
NavigableSet<ContainerID> containerIDs, String owner) {
for (ContainerID cid : containerIDs) {
try {
if (!getContainer(cid).getOwner().equals(owner)) {
containerIDs.remove(cid);
}
} catch (ContainerNotFoundException e) {
LOG.error("Could not find container info for container id={} {}", cid,
e);
containerIDs.remove(cid);
}
}
return containerIDs;
}



/**
* Returns the latest list of DataNodes where replica for given containerId
* exist. Throws an SCMException if no entry is found for given containerId.
Expand Down