Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.NavigableSet;
import java.util.List;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ratis.util.Preconditions;

/**
* Each Attribute that we manage for a container is maintained as a map.
Expand Down Expand Up @@ -60,37 +61,30 @@
* @param <T> Attribute type
*/
public class ContainerAttribute<T extends Enum<T>> {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerAttribute.class);

private final Class<T> attributeClass;
private final ImmutableMap<T, NavigableSet<ContainerID>> attributeMap;
private final ImmutableMap<T, NavigableMap<ContainerID, ContainerInfo>> attributeMap;

/**
* Create an empty Container Attribute map.
*/
public ContainerAttribute(Class<T> attributeClass) {
this.attributeClass = attributeClass;

final EnumMap<T, NavigableSet<ContainerID>> map = new EnumMap<>(attributeClass);
final EnumMap<T, NavigableMap<ContainerID, ContainerInfo>> map = new EnumMap<>(attributeClass);
for (T t : attributeClass.getEnumConstants()) {
map.put(t, new TreeSet<>());
map.put(t, new TreeMap<>());
}
this.attributeMap = Maps.immutableEnumMap(map);
}

/**
* Insert the value in the Attribute map, keep the original value if it exists
* already.
*
* @param key - The key to the set where the ContainerID should exist.
* @param value - Actual Container ID.
* @return true if the value is added;
* otherwise, the value already exists, return false.
* Add the given non-existing {@link ContainerInfo} to this attribute.
* @throws IllegalStateException if it already exists.
*/
public boolean insert(T key, ContainerID value) {
Objects.requireNonNull(value, "value == null");
return get(key).add(value);
public void addNonExisting(T key, ContainerInfo info) {
Objects.requireNonNull(info, "value == null");
final ContainerInfo previous = get(key).put(info.containerID(), info);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use putIfAbsent to avoid overwriting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is for adding non-existing elements. So, put is okay since the next line will throw IllegalStateException in case of overwriting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I saw that. But it leaves ContainerAttribute in the overwritten state. We could prevent that (but still throw the exception).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, for removeExisting(T key, ContainerInfo existing) we could use get(key).remove(existing.containerID, existing) to prevent removal in case it's not the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two method putIfAbsent and remove(key, value) require an additional lookup. So, let's don't use it for efficiency. Such cases are bugs that we don't expect it to happen.

Preconditions.assertNull(previous, "previous");
}

/**
Expand All @@ -103,30 +97,30 @@ public void clearSet(T key) {
}

/**
* Removes a container ID from the set pointed by the key.
*
* @param key - key to identify the set.
* @param value - Container ID
* Remove a container for the given id.
* @return the info if there was a mapping for the id; otherwise, return null
*/
public boolean remove(T key, ContainerID value) {
Objects.requireNonNull(value, "value == null");
public ContainerInfo remove(T key, ContainerID id) {
Objects.requireNonNull(id, "id == null");
return get(key).remove(id);
}

if (!get(key).remove(value)) {
LOG.debug("Container {} not found in {} attribute", value, key);
return false;
}
return true;
/** Remove an existing {@link ContainerInfo}. */
public void removeExisting(T key, ContainerInfo existing) {
Objects.requireNonNull(existing, "existing == null");
final ContainerInfo removed = remove(key, existing.containerID());
Preconditions.assertSame(existing, removed, "removed");
}

NavigableSet<ContainerID> get(T attribute) {
NavigableMap<ContainerID, ContainerInfo> get(T attribute) {
Objects.requireNonNull(attribute, "attribute == null");

final NavigableSet<ContainerID> set = attributeMap.get(attribute);
if (set == null) {
final NavigableMap<ContainerID, ContainerInfo> map = attributeMap.get(attribute);
if (map == null) {
throw new IllegalStateException("Attribute not found: " + attribute
+ " (" + attributeClass.getSimpleName() + ")");
}
return set;
return map;
}

/**
Expand All @@ -135,13 +129,13 @@ NavigableSet<ContainerID> get(T attribute) {
* @param key - Key to the bucket.
* @return Underlying Set in immutable form.
*/
public NavigableSet<ContainerID> getCollection(T key) {
return ImmutableSortedSet.copyOf(get(key));
public List<ContainerInfo> getCollection(T key) {
return new ArrayList<>(get(key).values());
}

public SortedSet<ContainerID> tailSet(T key, ContainerID start) {
public SortedMap<ContainerID, ContainerInfo> tailMap(T key, ContainerID start) {
Objects.requireNonNull(start, "start == null");
return get(key).tailSet(start);
return get(key).tailMap(start);
}

public int count(T key) {
Expand All @@ -163,17 +157,13 @@ public void update(T currentKey, T newKey, ContainerID value)
}

Objects.requireNonNull(newKey, "newKey == null");
final boolean removed = remove(currentKey, value);
if (!removed) {
final ContainerInfo removed = remove(currentKey, value);
if (removed == null) {
throw new SCMException("Failed to update Container " + value + " from " + currentKey + " to " + newKey
+ ": Container " + value + " not found in attribute " + currentKey,
FAILED_TO_CHANGE_CONTAINER_STATE);
}

final boolean inserted = insert(newKey, value);
if (!inserted) {
LOG.warn("Update Container {} from {} to {}: Container {} already exists in {}",
value, currentKey, newKey, value, newKey);
}
addNonExisting(newKey, removed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@
* select a container that belongs to user1, with Ratis replication which can
* make 3 copies of data. The fact that we will look for open containers by
* default and if we cannot find them we will add new containers.
*
* <p>
* All the calls are idempotent.
* <p>
* This class is NOT thread-safe.
*/
public class ContainerStateMap {
private static final Logger LOG =
Expand Down Expand Up @@ -167,9 +169,23 @@ ContainerReplica removeReplica(ContainerID containerID, DatanodeID datanodeID) {
return entry == null ? null : entry.removeReplica(datanodeID);
}
}

/**
* Map {@link LifeCycleState} to {@link ContainerInfo}.
* Note that a {@link ContainerInfo} can only exists in at most one of the {@link LifeCycleState}s.
*/
private final ContainerAttribute<LifeCycleState> lifeCycleStateMap = new ContainerAttribute<>(LifeCycleState.class);
/**
* Map {@link ReplicationType} to {@link ContainerInfo}.
* Note that a {@link ContainerInfo} can only exists in at most one of the {@link ReplicationType}s.
*/
private final ContainerAttribute<ReplicationType> typeMap = new ContainerAttribute<>(ReplicationType.class);
/**
* Map {@link ContainerID} to ({@link ContainerInfo} and {@link ContainerReplica}).
* Note that the following sets are exactly the same
* 1. The {@link ContainerInfo} in this map.
* 2. The {@link ContainerInfo} in the union of all the states in {@link #lifeCycleStateMap}.
* 2. The {@link ContainerInfo} in the union of all the types in {@link #typeMap}.
*/
private final ContainerMap containerMap = new ContainerMap();

/**
Expand All @@ -191,9 +207,8 @@ public static Logger getLogger() {
public void addContainer(final ContainerInfo info) {
Objects.requireNonNull(info, "info == null");
if (containerMap.addIfAbsent(info)) {
final ContainerID id = info.containerID();
lifeCycleStateMap.insert(info.getState(), id);
typeMap.insert(info.getReplicationType(), id);
lifeCycleStateMap.addNonExisting(info.getState(), info);
typeMap.addNonExisting(info.getReplicationType(), info);
LOG.trace("Added {}", info);
}
}
Expand All @@ -211,8 +226,8 @@ public void removeContainer(final ContainerID id) {
Objects.requireNonNull(id, "id == null");
final ContainerInfo info = containerMap.remove(id);
if (info != null) {
lifeCycleStateMap.remove(info.getState(), id);
typeMap.remove(info.getReplicationType(), id);
lifeCycleStateMap.removeExisting(info.getState(), info);
typeMap.removeExisting(info.getReplicationType(), info);
LOG.trace("Removed {}", info);
}
}
Expand Down Expand Up @@ -291,22 +306,17 @@ public List<ContainerInfo> getContainerInfos(ContainerID start, int count) {
*/
public List<ContainerInfo> getContainerInfos(LifeCycleState state, ContainerID start, int count) {
Preconditions.assertTrue(count >= 0, "count < 0");
return lifeCycleStateMap.tailSet(state, start).stream()
.map(this::getContainerInfo)
return lifeCycleStateMap.tailMap(state, start).values().stream()
.limit(count)
.collect(Collectors.toList());
}

public List<ContainerInfo> getContainerInfos(LifeCycleState state) {
return lifeCycleStateMap.getCollection(state).stream()
.map(this::getContainerInfo)
.collect(Collectors.toList());
return lifeCycleStateMap.getCollection(state);
}

public List<ContainerInfo> getContainerInfos(ReplicationType type) {
return typeMap.getCollection(type).stream()
.map(this::getContainerInfo)
.collect(Collectors.toList());
return typeMap.getCollection(type);
}

/** @return the number of containers for the given state. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.NavigableSet;
import java.util.NavigableMap;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.junit.jupiter.api.Test;

Expand All @@ -43,32 +44,30 @@ static <T extends Enum<T>> boolean hasContainerID(ContainerAttribute<T> attribut
}

static <T extends Enum<T>> boolean hasContainerID(ContainerAttribute<T> attribute, T key, ContainerID id) {
final NavigableSet<ContainerID> set = attribute.get(key);
return set != null && set.contains(id);
final NavigableMap<ContainerID, ContainerInfo> map = attribute.get(key);
return map != null && map.containsKey(id);
}

@Test
public void testInsert() {
public void testAddNonExisting() {
ContainerAttribute<Key> containerAttribute = new ContainerAttribute<>(Key.class);
ContainerID id = ContainerID.valueOf(42);
containerAttribute.insert(key1, id);
ContainerInfo info = new ContainerInfo.Builder().setContainerID(42).build();
ContainerID id = info.containerID();
containerAttribute.addNonExisting(key1, info);
assertEquals(1, containerAttribute.getCollection(key1).size());
assertThat(containerAttribute.getCollection(key1)).contains(id);
assertThat(containerAttribute.get(key1)).containsKey(id);

// Insert again and verify that the new ContainerId is inserted.
ContainerID newId =
ContainerID.valueOf(42);
containerAttribute.insert(key1, newId);
assertEquals(1, containerAttribute.getCollection(key1).size());
assertThat(containerAttribute.getCollection(key1)).contains(newId);
// Adding it again should fail.
assertThrows(IllegalStateException.class, () -> containerAttribute.addNonExisting(key1, info));
}

@Test
public void testClearSet() {
ContainerAttribute<Key> containerAttribute = new ContainerAttribute<>(Key.class);
for (Key k : Key.values()) {
for (int x = 1; x < 101; x++) {
containerAttribute.insert(k, ContainerID.valueOf(x));
ContainerInfo info = new ContainerInfo.Builder().setContainerID(x).build();
containerAttribute.addNonExisting(k, info);
}
}
for (Key k : Key.values()) {
Expand All @@ -85,7 +84,8 @@ public void testRemove() {

for (Key k : Key.values()) {
for (int x = 1; x < 101; x++) {
containerAttribute.insert(k, ContainerID.valueOf(x));
ContainerInfo info = new ContainerInfo.Builder().setContainerID(x).build();
containerAttribute.addNonExisting(k, info);
}
}
for (int x = 1; x < 101; x += 2) {
Expand All @@ -106,9 +106,10 @@ public void testRemove() {
@Test
public void tesUpdate() throws SCMException {
ContainerAttribute<Key> containerAttribute = new ContainerAttribute<>(Key.class);
ContainerID id = ContainerID.valueOf(42);
ContainerInfo info = new ContainerInfo.Builder().setContainerID(42).build();
ContainerID id = info.containerID();

containerAttribute.insert(key1, id);
containerAttribute.addNonExisting(key1, info);
assertTrue(hasContainerID(containerAttribute, key1, id));
assertFalse(hasContainerID(containerAttribute, key2, id));

Expand Down