diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java index 29f0083c0b6..bc83b8da6b1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; @@ -28,6 +29,9 @@ import org.slf4j.Logger; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.function.Supplier; @@ -76,7 +80,7 @@ protected void processContainerReplica(final DatanodeDetails datanodeDetails, // Synchronized block should be replaced by container lock, // once we have introduced lock inside ContainerInfo. synchronized (containerManager.getContainer(containerId)) { - updateContainerStats(containerId, replicaProto); + updateContainerStats(datanodeDetails, containerId, replicaProto); updateContainerState(datanodeDetails, containerId, replicaProto); updateContainerReplica(datanodeDetails, containerId, replicaProto); } @@ -90,7 +94,8 @@ protected void processContainerReplica(final DatanodeDetails datanodeDetails, * @param replicaProto Container Replica information * @throws ContainerNotFoundException If the container is not present */ - private void updateContainerStats(final ContainerID containerId, + private void updateContainerStats(final DatanodeDetails datanodeDetails, + final ContainerID containerId, final ContainerReplicaProto replicaProto) throws ContainerNotFoundException { @@ -103,14 +108,44 @@ private void updateContainerStats(final ContainerID containerId, containerInfo.updateSequenceId( replicaProto.getBlockCommitSequenceId()); } + List otherReplicas = + getOtherReplicas(containerId, datanodeDetails); + long usedBytes = replicaProto.getUsed(); + long keyCount = replicaProto.getKeyCount(); + for (ContainerReplica r : otherReplicas) { + // Open containers are generally growing in key count and size, the + // overall size should be the min of all reported replicas. + if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) { + usedBytes = Math.min(usedBytes, r.getBytesUsed()); + keyCount = Math.min(keyCount, r.getKeyCount()); + } else { + // Containers which are not open can only shrink in size, so use the + // largest values reported. + usedBytes = Math.max(usedBytes, r.getBytesUsed()); + keyCount = Math.max(keyCount, r.getKeyCount()); + } + } - if (containerInfo.getUsedBytes() < replicaProto.getUsed()) { - containerInfo.setUsedBytes(replicaProto.getUsed()); + if (containerInfo.getUsedBytes() != usedBytes) { + containerInfo.setUsedBytes(usedBytes); + } + if (containerInfo.getNumberOfKeys() != keyCount) { + containerInfo.setNumberOfKeys(keyCount); } - if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) { - containerInfo.setNumberOfKeys(replicaProto.getKeyCount()); + } + } + + private List getOtherReplicas(ContainerID containerId, + DatanodeDetails exclude) throws ContainerNotFoundException { + List filteredReplicas = new ArrayList<>(); + Set replicas + = containerManager.getContainerReplicas(containerId); + for (ContainerReplica r : replicas) { + if (!r.getDatanodeDetails().equals(exclude)) { + filteredReplicas.add(r); } } + return filteredReplicas; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 9914f8950e0..3a9ad1bc8ac 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -312,14 +312,6 @@ private void processContainer(ContainerID id) { action -> replicas.stream() .noneMatch(r -> r.getDatanodeDetails().equals(action.datanode))); - /* - * If the container is in CLOSED state, check and update it's key count - * and bytes used statistics if needed. - */ - if (state == LifeCycleState.CLOSED) { - checkAndUpdateContainerInfo(container, replicas); - } - /* * We don't have to take any action if the container is healthy. * @@ -773,32 +765,6 @@ private void handleUnstableContainer(final ContainerInfo container, } - /** - * Check and update Container key count and used bytes based on it's replica's - * data. - */ - private void checkAndUpdateContainerInfo(final ContainerInfo container, - final Set replicas) { - // check container key count and bytes used - long maxUsedBytes = 0; - long maxKeyCount = 0; - ContainerReplica[] rps = replicas.toArray(new ContainerReplica[0]); - for (int i = 0; i < rps.length; i++) { - maxUsedBytes = Math.max(maxUsedBytes, rps[i].getBytesUsed()); - maxKeyCount = Math.max(maxKeyCount, rps[i].getKeyCount()); - } - if (maxKeyCount < container.getNumberOfKeys()) { - LOG.debug("Container {} key count changed from {} to {}", - container.containerID(), container.getNumberOfKeys(), maxKeyCount); - container.setNumberOfKeys(maxKeyCount); - } - if (maxUsedBytes < container.getUsedBytes()) { - LOG.debug("Container {} used bytes changed from {} to {}", - container.containerID(), container.getUsedBytes(), maxUsedBytes); - container.setUsedBytes(maxUsedBytes); - } - } - /** * Sends close container command for the given container to the given * datanode. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index c7ec835e55b..9f308fa9738 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -39,11 +39,13 @@ import org.mockito.Mockito; import java.io.IOException; +import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import static junit.framework.TestCase.assertEquals; import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas; import static org.apache.hadoop.hdds.scm.TestUtils.getContainer; @@ -483,9 +485,167 @@ public void testQuasiClosedToClosed() Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState()); } + @Test + public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas() + throws SCMException { + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, containerManager); + final Iterator nodeIterator = nodeManager.getNodes( + NodeState.HEALTHY).iterator(); + + final DatanodeDetails datanodeOne = nodeIterator.next(); + final DatanodeDetails datanodeTwo = nodeIterator.next(); + final DatanodeDetails datanodeThree = nodeIterator.next(); + + final ContainerReplicaProto.State replicaState + = ContainerReplicaProto.State.OPEN; + final ContainerInfo containerOne = getContainer(LifeCycleState.OPEN); + + final Set containerIDSet = new HashSet<>(); + containerIDSet.add(containerOne.containerID()); + + containerStateManager.loadContainer(containerOne); + // Container loaded, no replicas reported from DNs. Expect zeros for + // usage values. + assertEquals(0L, containerOne.getUsedBytes()); + assertEquals(0L, containerOne.getNumberOfKeys()); + + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeOne, 50L, 60L), publisher); + + // Single replica reported - ensure values are updated + assertEquals(50L, containerOne.getUsedBytes()); + assertEquals(60L, containerOne.getNumberOfKeys()); + + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeTwo, 50L, 60L), publisher); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeThree, 50L, 60L), publisher); + + // All 3 DNs are reporting the same values. Counts should be as expected. + assertEquals(50L, containerOne.getUsedBytes()); + assertEquals(60L, containerOne.getNumberOfKeys()); + + // Now each DN reports a different lesser value. Counts should be the min + // reported. + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeOne, 1L, 10L), publisher); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeTwo, 2L, 11L), publisher); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeThree, 3L, 12L), publisher); + + // All 3 DNs are reporting different values. The actual value should be the + // minimum. + assertEquals(1L, containerOne.getUsedBytes()); + assertEquals(10L, containerOne.getNumberOfKeys()); + + // Have the lowest value report a higher value and ensure the new value + // is the minimum + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeOne, 3L, 12L), publisher); + + assertEquals(2L, containerOne.getUsedBytes()); + assertEquals(11L, containerOne.getNumberOfKeys()); + } + + @Test + public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas() + throws SCMException { + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, containerManager); + final Iterator nodeIterator = nodeManager.getNodes( + NodeState.HEALTHY).iterator(); + + final DatanodeDetails datanodeOne = nodeIterator.next(); + final DatanodeDetails datanodeTwo = nodeIterator.next(); + final DatanodeDetails datanodeThree = nodeIterator.next(); + + final ContainerReplicaProto.State replicaState + = ContainerReplicaProto.State.CLOSED; + final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED); + + final Set containerIDSet = new HashSet<>(); + containerIDSet.add(containerOne.containerID()); + + containerStateManager.loadContainer(containerOne); + // Container loaded, no replicas reported from DNs. Expect zeros for + // usage values. + assertEquals(0L, containerOne.getUsedBytes()); + assertEquals(0L, containerOne.getNumberOfKeys()); + + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeOne, 50L, 60L), publisher); + + // Single replica reported - ensure values are updated + assertEquals(50L, containerOne.getUsedBytes()); + assertEquals(60L, containerOne.getNumberOfKeys()); + + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeTwo, 50L, 60L), publisher); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeThree, 50L, 60L), publisher); + + // All 3 DNs are reporting the same values. Counts should be as expected. + assertEquals(50L, containerOne.getUsedBytes()); + assertEquals(60L, containerOne.getNumberOfKeys()); + + // Now each DN reports a different lesser value. Counts should be the max + // reported. + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeOne, 1L, 10L), publisher); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeTwo, 2L, 11L), publisher); + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeThree, 3L, 12L), publisher); + + // All 3 DNs are reporting different values. The actual value should be the + // maximum. + assertEquals(3L, containerOne.getUsedBytes()); + assertEquals(12L, containerOne.getNumberOfKeys()); + + // Have the highest value report a lower value and ensure the new value + // is the new maximumu + reportHandler.onMessage(getContainerReportFromDatanode( + containerOne.containerID(), replicaState, + datanodeThree, 1L, 10L), publisher); + + assertEquals(2L, containerOne.getUsedBytes()); + assertEquals(11L, containerOne.getNumberOfKeys()); + } + + private ContainerReportFromDatanode getContainerReportFromDatanode( + ContainerID containerId, ContainerReplicaProto.State state, + DatanodeDetails dn, long bytesUsed, long keyCount) { + ContainerReportsProto containerReport = getContainerReportsProto( + containerId, state, dn.getUuidString(), bytesUsed, keyCount); + + return new ContainerReportFromDatanode(dn, containerReport); + } + private static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, final String originNodeId) { + return getContainerReportsProto(containerId, state, originNodeId, + 2000000000L, 100000000L); + } + + private static ContainerReportsProto getContainerReportsProto( + final ContainerID containerId, final ContainerReplicaProto.State state, + final String originNodeId, final long usedBytes, final long keyCount) { final ContainerReportsProto.Builder crBuilder = ContainerReportsProto.newBuilder(); final ContainerReplicaProto replicaProto = @@ -495,8 +655,8 @@ private static ContainerReportsProto getContainerReportsProto( .setOriginNodeId(originNodeId) .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") .setSize(5368709120L) - .setUsed(2000000000L) - .setKeyCount(100000000L) + .setUsed(usedBytes) + .setKeyCount(keyCount) .setReadCount(100000000L) .setWriteCount(100000000L) .setReadBytes(2000000000L) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index e451c07f56c..aeb5bc7fbd5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -270,11 +270,8 @@ public void testContainerStatistics() throws Exception { }); om.deleteKey(keyArgs); - // Want for blocks to be deleted + // Wait for blocks to be deleted and container reports to be processed Thread.sleep(5000); - scm.getReplicationManager().processContainersNow(); - // Wait for container statistics change - Thread.sleep(1000); containerInfos = scm.getContainerManager().getContainers(); containerInfos.stream().forEach(container -> { Assert.assertEquals(0, container.getUsedBytes());