diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java index 7f21366435b3..88522f2f9f45 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java @@ -39,6 +39,8 @@ public final class ContainerID implements Comparable { LongCodec.get(), ContainerID::valueOf, c -> c.id, DelegatedCodec.CopyType.SHALLOW); + public static final ContainerID MIN = ContainerID.valueOf(0); + public static Codec getCodec() { return CODEC; } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/CollectionUtils.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/CollectionUtils.java index 5d46bcc95771..d8188a9cf112 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/CollectionUtils.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/CollectionUtils.java @@ -20,14 +20,20 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.PriorityQueue; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; +import static java.util.Comparator.naturalOrder; + /** Utility methods for Java Collections. */ public interface CollectionUtils { static Map newUnmodifiableMap( @@ -89,4 +95,50 @@ public T next() { } }; } + + static > List findTopN(Iterable input, int n) { + return findTopN(input, n, any -> true); + } + + static > List findTopN( + Iterable input, + int n, + Predicate filter + ) { + return findTopN(input, n, naturalOrder(), filter); + } + + static List findTopN( + Iterable input, + int n, + Comparator comparator + ) { + return findTopN(input, n, comparator, any -> true); + } + + static List findTopN( + Iterable input, + int n, + Comparator comparator, + Predicate filter + ) { + PriorityQueue heap = new PriorityQueue<>(comparator); + + for (T item : input) { + if (filter.test(item)) { + heap.add(item); + + if (heap.size() > n) { + heap.poll(); + } + } + } + + LinkedList result = new LinkedList<>(); + while (!heap.isEmpty()) { + result.addFirst(heap.poll()); + } + + return result; + } } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestCollectionUtils.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestCollectionUtils.java index 2fd773d8785e..b921a227db48 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestCollectionUtils.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestCollectionUtils.java @@ -21,11 +21,18 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.function.Predicate; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; +import static java.util.Collections.reverseOrder; import static java.util.Collections.singletonList; +import static java.util.Comparator.naturalOrder; +import static java.util.stream.Collectors.toList; /** Test for {@link CollectionUtils}. */ public class TestCollectionUtils { @@ -85,4 +92,57 @@ private static void assertIteration(List expected, CollectionUtils.newIterator(listOfLists).forEachRemaining(actual::add); Assertions.assertEquals(expected, actual); } + + @Test + void topN() { + testTopNInts(Arrays.asList(5, 8, 10, 20, 1, 2, 3, 42)); + testTopNStrings(Arrays.asList("abc", "QWERTY", "hello world", "mayday", + "a new day", "\n", "LICENSE")); + } + + private void testTopNStrings(List strings) { + testTopN(strings, + s -> s.startsWith("a"), + s -> s.equals(s.toLowerCase())); + } + + private static void testTopNInts(List ints) { + testTopN(ints, + i -> (i % 2 == 0), + i -> (i < 20)); + } + + @SafeVarargs + private static > void testTopN(List items, + Predicate... predicates) { + + testTopN(items, naturalOrder(), any -> true); + testTopN(items, reverseOrder(), any -> true); + + for (Predicate predicate : predicates) { + testTopN(items, naturalOrder(), predicate); + testTopN(items, reverseOrder(), predicate); + } + } + + private static void testTopN(List items, Comparator comparator, + Predicate predicate) { + List shuffled = new ArrayList<>(items); + Collections.shuffle(shuffled); + + List sorted = new ArrayList<>(items); + sorted.sort(comparator.reversed()); // descending order + + for (int i = 0; i <= items.size() + 1; i++) { + assertTopN(items, comparator, predicate, sorted, i); + } + assertTopN(items, comparator, predicate, sorted, Integer.MAX_VALUE); + } + + private static void assertTopN(List items, Comparator comparator, + Predicate filter, List sorted, int limit) { + Assertions.assertEquals( + sorted.stream().filter(filter).limit(limit).collect(toList()), + CollectionUtils.findTopN(items, limit, comparator, filter)); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java index a4d6f36c5961..2a62e1c71dea 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java @@ -19,19 +19,18 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; -import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -55,7 +54,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.util.Comparator.reverseOrder; import static org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.CONTAINER_ID; +import static org.apache.hadoop.hdds.utils.CollectionUtils.findTopN; /** * {@link ContainerManager} implementation in SCM server. @@ -144,34 +145,14 @@ public ContainerInfo getContainer(final ContainerID id) public List getContainers(final ContainerID startID, final int count) { scmContainerManagerMetrics.incNumListContainersOps(); - final List containersIds = - new ArrayList<>(containerStateManager.getContainerIDs()); - Collections.sort(containersIds); - List containers; - lock.lock(); - try { - containers = containersIds.stream() - .filter(id -> id.compareTo(startID) >= 0).limit(count) - .map(containerStateManager::getContainer) - .collect(Collectors.toList()); - } finally { - lock.unlock(); - } - return containers; + return toContainers(filterSortAndLimit(startID, count, + containerStateManager.getContainerIDs())); } @Override public List getContainers(final LifeCycleState state) { - List containers; - lock.lock(); - try { - containers = containerStateManager.getContainerIDs(state).stream() - .map(containerStateManager::getContainer) - .filter(Objects::nonNull).collect(Collectors.toList()); - } finally { - lock.unlock(); - } - return containers; + scmContainerManagerMetrics.incNumListContainersOps(); + return toContainers(containerStateManager.getContainerIDs(state)); } @Override @@ -179,20 +160,8 @@ public List getContainers(final ContainerID startID, final int count, final LifeCycleState state) { scmContainerManagerMetrics.incNumListContainersOps(); - final List containersIds = - new ArrayList<>(containerStateManager.getContainerIDs(state)); - Collections.sort(containersIds); - List containers; - lock.lock(); - try { - containers = containersIds.stream() - .filter(id -> id.compareTo(startID) >= 0).limit(count) - .map(containerStateManager::getContainer) - .collect(Collectors.toList()); - } finally { - lock.unlock(); - } - return containers; + return toContainers(filterSortAndLimit(startID, count, + containerStateManager.getContainerIDs(state))); } @Override @@ -431,18 +400,24 @@ public void notifyContainerReportProcessing(final boolean isFullReport, public void deleteContainer(final ContainerID cid) throws IOException, TimeoutException { HddsProtos.ContainerID protoId = cid.getProtobuf(); + + final boolean found; lock.lock(); try { - if (containerExist(cid)) { + found = containerExist(cid); + if (found) { containerStateManager.removeContainer(protoId); - scmContainerManagerMetrics.incNumSuccessfulDeleteContainers(); - } else { - scmContainerManagerMetrics.incNumFailureDeleteContainers(); - throwContainerNotFoundException(cid); } } finally { lock.unlock(); } + + if (found) { + scmContainerManagerMetrics.incNumSuccessfulDeleteContainers(); + } else { + scmContainerManagerMetrics.incNumFailureDeleteContainers(); + throwContainerNotFoundException(cid); + } } @Override @@ -471,4 +446,33 @@ public ContainerStateManager getContainerStateManager() { public SCMHAManager getSCMHAManager() { return haManager; } + + private static List filterSortAndLimit( + ContainerID startID, int count, Set set) { + + if (ContainerID.MIN.equals(startID) && count >= set.size()) { + List list = new ArrayList<>(set); + Collections.sort(list); + return list; + } + + return findTopN(set, count, reverseOrder(), + id -> id.compareTo(startID) >= 0); + } + + /** + * Returns a list of all containers identified by {@code ids}. + */ + private List toContainers(Collection ids) { + List containers = new ArrayList<>(ids.size()); + + for (ContainerID id : ids) { + ContainerInfo container = containerStateManager.getContainer(id); + if (container != null) { + containers.add(container); + } + } + + return containers; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java index 1e4eee0d66de..c6f15be5d2cf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container.states; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSortedSet; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.slf4j.Logger; @@ -189,7 +190,7 @@ public NavigableSet getCollection(T key) { Preconditions.checkNotNull(key); if (this.attributeMap.containsKey(key)) { - return Collections.unmodifiableNavigableSet(this.attributeMap.get(key)); + return ImmutableSortedSet.copyOf(this.attributeMap.get(key)); } if (LOG.isDebugEnabled()) { LOG.debug("No such Key. Key {}", key); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index 2d74160fc436..438e9709bffd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -28,6 +28,7 @@ import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerReplica; @@ -295,7 +296,7 @@ public void updateState(ContainerID containerID, LifeCycleState currentState, } public Set getAllContainerIDs() { - return Collections.unmodifiableSet(containerMap.keySet()); + return ImmutableSet.copyOf(containerMap.keySet()); } /** diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java index ffe725ea2b72..dca8498e38ff 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java @@ -19,6 +19,8 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -30,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps; import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub; @@ -49,6 +52,8 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import static java.util.Collections.emptyList; +import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN; @@ -118,66 +123,82 @@ void testUpdateContainerState() throws Exception { RatisReplicationConfig.getInstance( ReplicationFactor.THREE), "admin"); final ContainerID cid = container.containerID(); - Assertions.assertEquals(HddsProtos.LifeCycleState.OPEN, + Assertions.assertEquals(LifeCycleState.OPEN, containerManager.getContainer(cid).getState()); containerManager.updateContainerState(cid, HddsProtos.LifeCycleEvent.FINALIZE); - Assertions.assertEquals(HddsProtos.LifeCycleState.CLOSING, + Assertions.assertEquals(LifeCycleState.CLOSING, containerManager.getContainer(cid).getState()); containerManager.updateContainerState(cid, HddsProtos.LifeCycleEvent.QUASI_CLOSE); - Assertions.assertEquals(HddsProtos.LifeCycleState.QUASI_CLOSED, + Assertions.assertEquals(LifeCycleState.QUASI_CLOSED, containerManager.getContainer(cid).getState()); containerManager.updateContainerState(cid, HddsProtos.LifeCycleEvent.FORCE_CLOSE); - Assertions.assertEquals(HddsProtos.LifeCycleState.CLOSED, + Assertions.assertEquals(LifeCycleState.CLOSED, containerManager.getContainer(cid).getState()); } @Test void testGetContainers() throws Exception { - Assertions.assertTrue( - containerManager.getContainers().isEmpty()); + Assertions.assertEquals(emptyList(), containerManager.getContainers()); - ContainerID[] cidArray = new ContainerID[10]; + List ids = new ArrayList<>(); for (int i = 0; i < 10; i++) { ContainerInfo container = containerManager.allocateContainer( - RatisReplicationConfig.getInstance( - ReplicationFactor.THREE), "admin"); - cidArray[i] = container.containerID(); + RatisReplicationConfig.getInstance(ReplicationFactor.THREE), "admin"); + ids.add(container.containerID()); } - Assertions.assertEquals(10, - containerManager.getContainers(cidArray[0], 10).size()); - Assertions.assertEquals(10, - containerManager.getContainers(cidArray[0], 100).size()); + assertIds(ids, + containerManager.getContainers(ContainerID.MIN, 10)); + assertIds(ids.subList(0, 5), + containerManager.getContainers(ContainerID.MIN, 5)); - containerManager.updateContainerState(cidArray[0], + assertIds(ids, containerManager.getContainers(ids.get(0), 10)); + assertIds(ids, containerManager.getContainers(ids.get(0), 100)); + assertIds(ids.subList(5, ids.size()), + containerManager.getContainers(ids.get(5), 100)); + assertIds(emptyList(), + containerManager.getContainers(ids.get(5), 100, LifeCycleState.CLOSED)); + + containerManager.updateContainerState(ids.get(0), HddsProtos.LifeCycleEvent.FINALIZE); - Assertions.assertEquals(9, - containerManager.getContainers(HddsProtos.LifeCycleState.OPEN).size()); - Assertions.assertEquals(1, containerManager - .getContainers(HddsProtos.LifeCycleState.CLOSING).size()); - containerManager.updateContainerState(cidArray[1], + assertIds(ids.subList(0, 1), + containerManager.getContainers(LifeCycleState.CLOSING)); + assertIds(ids.subList(1, ids.size()), + containerManager.getContainers(LifeCycleState.OPEN)); + + containerManager.updateContainerState(ids.get(1), HddsProtos.LifeCycleEvent.FINALIZE); - Assertions.assertEquals(8, - containerManager.getContainers(HddsProtos.LifeCycleState.OPEN).size()); - Assertions.assertEquals(2, containerManager - .getContainers(HddsProtos.LifeCycleState.CLOSING).size()); - containerManager.updateContainerState(cidArray[1], + assertIds(ids.subList(0, 2), + containerManager.getContainers(LifeCycleState.CLOSING)); + assertIds(ids.subList(2, ids.size()), + containerManager.getContainers(LifeCycleState.OPEN)); + + containerManager.updateContainerState(ids.get(1), HddsProtos.LifeCycleEvent.QUASI_CLOSE); - containerManager.updateContainerState(cidArray[2], + containerManager.updateContainerState(ids.get(2), HddsProtos.LifeCycleEvent.FINALIZE); - containerManager.updateContainerState(cidArray[2], + containerManager.updateContainerState(ids.get(2), HddsProtos.LifeCycleEvent.CLOSE); Assertions.assertEquals(7, containerManager. - getContainerStateCount(HddsProtos.LifeCycleState.OPEN)); + getContainerStateCount(LifeCycleState.OPEN)); Assertions.assertEquals(1, containerManager - .getContainerStateCount(HddsProtos.LifeCycleState.CLOSING)); + .getContainerStateCount(LifeCycleState.CLOSING)); Assertions.assertEquals(1, containerManager - .getContainerStateCount(HddsProtos.LifeCycleState.QUASI_CLOSED)); + .getContainerStateCount(LifeCycleState.QUASI_CLOSED)); Assertions.assertEquals(1, containerManager - .getContainerStateCount(HddsProtos.LifeCycleState.CLOSED)); + .getContainerStateCount(LifeCycleState.CLOSED)); + } + + private static void assertIds( + List expected, + List containers + ) { + Assertions.assertEquals(expected, containers.stream() + .map(ContainerInfo::containerID) + .collect(toList())); } @Test