Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedSet;
import java.util.Collections;
import java.util.Map;
import com.google.common.collect.Maps;
import java.util.EnumMap;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.Objects;
import java.util.TreeSet;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.slf4j.Logger;
Expand All @@ -38,8 +38,6 @@
* <p>
* 1. StateMap - LifeCycleState -&gt; Set of ContainerIDs
* 2. TypeMap - ReplicationType -&gt; Set of ContainerIDs
* 3. OwnerMap - OwnerNames -&gt; Set of ContainerIDs
* 4. FactorMap - ReplicationFactor -&gt; Set of ContainerIDs
* <p>
* This means that for a cluster size of 750 PB -- we will have around 150
* Million containers, if we assume 5GB average container size.
Expand All @@ -57,29 +55,27 @@
* are going to rely on ContainerStateMap locks to maintain consistency of
* data in these classes too, since ContainerAttribute is only used by
* ContainerStateMap class.
*
* @param <T> Attribute type
*/
public class ContainerAttribute<T> {
public class ContainerAttribute<T extends Enum<T>> {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerAttribute.class);

private final Map<T, NavigableSet<ContainerID>> attributeMap;
private static final NavigableSet<ContainerID> EMPTY_SET = Collections
.unmodifiableNavigableSet(new ConcurrentSkipListSet<>());

/**
* Creates a Container Attribute map from an existing Map.
*
* @param attributeMap - AttributeMap
*/
public ContainerAttribute(Map<T, NavigableSet<ContainerID>> attributeMap) {
this.attributeMap = attributeMap;
}
private final Class<T> attributeClass;
private final ImmutableMap<T, NavigableSet<ContainerID>> attributeMap;

/**
* Create an empty Container Attribute map.
*/
public ContainerAttribute() {
this.attributeMap = new ConcurrentHashMap<>();
public ContainerAttribute(Class<T> attributeClass) {
this.attributeClass = attributeClass;

final EnumMap<T, NavigableSet<ContainerID>> map = new EnumMap<>(attributeClass);
for (T t : attributeClass.getEnumConstants()) {
map.put(t, new TreeSet<>());
}
this.attributeMap = Maps.immutableEnumMap(map);
}

/**
Expand All @@ -88,51 +84,12 @@ public ContainerAttribute() {
*
* @param key - The key to the set where the ContainerID should exist.
* @param value - Actual Container ID.
* @throws SCMException - on Error
*/
public boolean insert(T key, ContainerID value) throws SCMException {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);
attributeMap.computeIfAbsent(key, any ->
new ConcurrentSkipListSet<>()).add(value);
return true;
}

/**
* Returns true if have this bucket in the attribute map.
*
* @param key - Key to lookup
* @return true if we have the key
*/
public boolean hasKey(T key) {
Preconditions.checkNotNull(key);
return this.attributeMap.containsKey(key);
}

/**
* Returns true if we have the key and the containerID in the bucket.
*
* @param key - Key to the bucket
* @param id - container ID that we want to lookup
* @return true or false
* @return true if the value is added;
* otherwise, the value already exists, return false.
*/
public boolean hasContainerID(T key, ContainerID id) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(id);

return this.attributeMap.containsKey(key) &&
this.attributeMap.get(key).contains(id);
}

/**
* Returns true if we have the key and the containerID in the bucket.
*
* @param key - Key to the bucket
* @param id - container ID that we want to lookup
* @return true or false
*/
public boolean hasContainerID(T key, int id) {
return hasContainerID(key, ContainerID.valueOf(id));
public boolean insert(T key, ContainerID value) {
Objects.requireNonNull(value, "value == null");
return get(key).add(value);
}

/**
Expand All @@ -141,15 +98,7 @@ public boolean hasContainerID(T key, int id) {
* @param key - Key that identifies the Set.
*/
public void clearSet(T key) {
Preconditions.checkNotNull(key);

if (attributeMap.containsKey(key)) {
attributeMap.get(key).clear();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("key: {} does not exist in the attributeMap", key);
}
}
get(key).clear();
}

/**
Expand All @@ -159,24 +108,24 @@ public void clearSet(T key) {
* @param value - Container ID
*/
public boolean remove(T key, ContainerID value) {
Preconditions.checkNotNull(key);
Preconditions.checkNotNull(value);

if (attributeMap.containsKey(key)) {
if (!attributeMap.get(key).remove(value)) {
if (LOG.isDebugEnabled()) {
LOG.debug("ContainerID: {} does not exist in the set pointed by " +
"key:{}", value, key);
}
return false;
}
return true;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("key: {} does not exist in the attributeMap", key);
}
Objects.requireNonNull(value, "value == null");

if (!get(key).remove(value)) {
LOG.debug("Container {} not found in {} attribute", value, key);
return false;
}
return true;
}

NavigableSet<ContainerID> get(T attribute) {
Objects.requireNonNull(attribute, "attribute == null");

final NavigableSet<ContainerID> set = attributeMap.get(attribute);
if (set == null) {
throw new IllegalStateException("Attribute not found: " + attribute
+ " (" + attributeClass.getSimpleName() + ")");
}
return set;
}

/**
Expand All @@ -186,15 +135,7 @@ public boolean remove(T key, ContainerID value) {
* @return Underlying Set in immutable form.
*/
public NavigableSet<ContainerID> getCollection(T key) {
Preconditions.checkNotNull(key);

if (this.attributeMap.containsKey(key)) {
return ImmutableSortedSet.copyOf(this.attributeMap.get(key));
}
if (LOG.isDebugEnabled()) {
LOG.debug("No such Key. Key {}", key);
}
return EMPTY_SET;
return ImmutableSortedSet.copyOf(get(key));
}

/**
Expand All @@ -207,31 +148,22 @@ public NavigableSet<ContainerID> getCollection(T key) {
*/
public void update(T currentKey, T newKey, ContainerID value)
throws SCMException {
Preconditions.checkNotNull(currentKey);
Preconditions.checkNotNull(newKey);
// Return if container attribute not changed
if (currentKey == newKey) {
if (currentKey == newKey) { // use == for enum
return;
}
boolean removed = false;
try {
removed = remove(currentKey, value);
if (!removed) {
throw new SCMException("Unable to find key in the current key bucket",
FAILED_TO_CHANGE_CONTAINER_STATE);
}
insert(newKey, value);
} catch (SCMException ex) {
// if we removed the key, insert it back to original bucket, since the
// next insert failed.
LOG.error("error in update.", ex);
if (removed) {
insert(currentKey, value);
if (LOG.isTraceEnabled()) {
LOG.trace("reinserted the removed key. {}", currentKey);
}
}
throw ex;

Objects.requireNonNull(newKey, "newKey == null");
final boolean removed = remove(currentKey, value);
if (!removed) {
throw new SCMException("Failed to update Container " + value + " from " + currentKey + " to " + newKey
+ ": Container " + value + " not found in attribute " + currentKey,
FAILED_TO_CHANGE_CONTAINER_STATE);
}

final boolean inserted = insert(newKey, value);
if (!inserted) {
LOG.warn("Update Container {} from {} to {}: Container {} already exists in {}",
value, currentKey, newKey, value, newKey);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.hadoop.hdds.scm.container.states;

import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
Expand All @@ -28,7 +26,6 @@
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
Expand Down Expand Up @@ -75,21 +72,15 @@ public class ContainerStateMap {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerStateMap.class);

private final ContainerAttribute<LifeCycleState> lifeCycleStateMap;
private final ContainerAttribute<String> ownerMap;
private final ContainerAttribute<ReplicationConfig> repConfigMap;
private final ContainerAttribute<ReplicationType> typeMap;
private final ContainerAttribute<LifeCycleState> lifeCycleStateMap = new ContainerAttribute<>(LifeCycleState.class);
private final ContainerAttribute<ReplicationType> typeMap = new ContainerAttribute<>(ReplicationType.class);
private final Map<ContainerID, ContainerInfo> containerMap;
private final Map<ContainerID, Set<ContainerReplica>> replicaMap;

/**
* Create a ContainerStateMap.
*/
public ContainerStateMap() {
this.lifeCycleStateMap = new ContainerAttribute<>();
this.ownerMap = new ContainerAttribute<>();
this.repConfigMap = new ContainerAttribute<>();
this.typeMap = new ContainerAttribute<>();
this.containerMap = new ConcurrentHashMap<>();
this.replicaMap = new ConcurrentHashMap<>();
}
Expand All @@ -112,8 +103,6 @@ public void addContainer(final ContainerInfo info)
if (!contains(id)) {
containerMap.put(id, info);
lifeCycleStateMap.insert(info.getState(), id);
ownerMap.insert(info.getOwner(), id);
repConfigMap.insert(info.getReplicationConfig(), id);
typeMap.insert(info.getReplicationType(), id);
replicaMap.put(id, Collections.emptySet());

Expand All @@ -137,8 +126,6 @@ public void removeContainer(final ContainerID id) {
// remove operation fails?
final ContainerInfo info = containerMap.remove(id);
lifeCycleStateMap.remove(info.getState(), id);
ownerMap.remove(info.getOwner(), id);
repConfigMap.remove(info.getReplicationConfig(), id);
typeMap.remove(info.getReplicationType(), id);
replicaMap.remove(id);
LOG.trace("Container {} removed from ContainerStateMap.", id);
Expand Down Expand Up @@ -215,56 +202,16 @@ private void replaceReplicaSet(ContainerID containerID,
*/
public void updateState(ContainerID containerID, LifeCycleState currentState,
LifeCycleState newState) throws SCMException {
Preconditions.checkNotNull(currentState);
Preconditions.checkNotNull(newState);
if (!contains(containerID)) {
return;
}
// Return if updating state not changed
if (currentState == newState) {
LOG.debug("CurrentState and NewState are the same, return from " +
"updateState directly.");
if (currentState == newState) { // state not changed
return;
}
// TODO: Simplify this logic.
final ContainerInfo currentInfo = containerMap.get(containerID);
try {
currentInfo.setState(newState);

// We are updating two places before this update is done, these can
// fail independently, since the code needs to handle it.

// We update the attribute map, if that fails it will throw an
// exception, so no issues, if we are successful, we keep track of the
// fact that we have updated the lifecycle state in the map, and update
// the container state. If this second update fails, we will attempt to
// roll back the earlier change we did. If the rollback fails, we can
// be in an inconsistent state,

lifeCycleStateMap.update(currentState, newState, containerID);
if (LOG.isTraceEnabled()) {
LOG.trace("Updated the container {} to new state. Old = {}, new = " +
"{}", containerID, currentState, newState);
}
} catch (SCMException ex) {
LOG.error("Unable to update the container state.", ex);
// we need to revert the change in this attribute since we are not
// able to update the hash table.
LOG.info("Reverting the update to lifecycle state. Moving back to " +
"old state. Old = {}, Attempted state = {}", currentState,
newState);

currentInfo.revertState();

// if this line throws, the state map can be in an inconsistent
// state, since we will have modified the attribute by the
// container state will not in sync since we were not able to put
// that into the hash table.
lifeCycleStateMap.update(newState, currentState, containerID);

throw new SCMException("Updating the container map failed.", ex,
FAILED_TO_CHANGE_CONTAINER_STATE);
if (currentInfo == null) { // container not found
return;
}
lifeCycleStateMap.update(currentState, newState, containerID);
LOG.trace("Updated the container {} from {} to {}", containerID, currentState, newState);
currentInfo.setState(newState);
}

public Set<ContainerID> getAllContainerIDs() {
Expand Down
Loading