Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
Expand All @@ -28,6 +29,9 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;

Expand Down Expand Up @@ -76,7 +80,7 @@ protected void processContainerReplica(final DatanodeDetails datanodeDetails,
// Synchronized block should be replaced by container lock,
// once we have introduced lock inside ContainerInfo.
synchronized (containerManager.getContainer(containerId)) {
updateContainerStats(containerId, replicaProto);
updateContainerStats(datanodeDetails, containerId, replicaProto);
updateContainerState(datanodeDetails, containerId, replicaProto);
updateContainerReplica(datanodeDetails, containerId, replicaProto);
}
Expand All @@ -90,7 +94,8 @@ protected void processContainerReplica(final DatanodeDetails datanodeDetails,
* @param replicaProto Container Replica information
* @throws ContainerNotFoundException If the container is not present
*/
private void updateContainerStats(final ContainerID containerId,
private void updateContainerStats(final DatanodeDetails datanodeDetails,
final ContainerID containerId,
final ContainerReplicaProto replicaProto)
throws ContainerNotFoundException {

Expand All @@ -103,14 +108,44 @@ private void updateContainerStats(final ContainerID containerId,
containerInfo.updateSequenceId(
replicaProto.getBlockCommitSequenceId());
}
List<ContainerReplica> otherReplicas =
getOtherReplicas(containerId, datanodeDetails);
long usedBytes = replicaProto.getUsed();
long keyCount = replicaProto.getKeyCount();
for (ContainerReplica r : otherReplicas) {
// Open containers are generally growing in key count and size, the
// overall size should be the min of all reported replicas.
if (containerInfo.getState().equals(HddsProtos.LifeCycleState.OPEN)) {
usedBytes = Math.min(usedBytes, r.getBytesUsed());
keyCount = Math.min(keyCount, r.getKeyCount());
} else {
// Containers which are not open can only shrink in size, so use the
// largest values reported.
usedBytes = Math.max(usedBytes, r.getBytesUsed());
keyCount = Math.max(keyCount, r.getKeyCount());
}
}

if (containerInfo.getUsedBytes() < replicaProto.getUsed()) {
containerInfo.setUsedBytes(replicaProto.getUsed());
if (containerInfo.getUsedBytes() != usedBytes) {
containerInfo.setUsedBytes(usedBytes);
}
if (containerInfo.getNumberOfKeys() != keyCount) {
containerInfo.setNumberOfKeys(keyCount);
}
if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) {
containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
}
}

private List<ContainerReplica> getOtherReplicas(ContainerID containerId,
DatanodeDetails exclude) throws ContainerNotFoundException {
List<ContainerReplica> filteredReplicas = new ArrayList<>();
Set<ContainerReplica> replicas
= containerManager.getContainerReplicas(containerId);
for (ContainerReplica r : replicas) {
if (!r.getDatanodeDetails().equals(exclude)) {
filteredReplicas.add(r);
}
}
return filteredReplicas;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,6 @@ private void processContainer(ContainerID id) {
action -> replicas.stream()
.noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)));

/*
* If the container is in CLOSED state, check and update it's key count
* and bytes used statistics if needed.
*/
if (state == LifeCycleState.CLOSED) {
checkAndUpdateContainerInfo(container, replicas);
}

/*
* We don't have to take any action if the container is healthy.
*
Expand Down Expand Up @@ -773,32 +765,6 @@ private void handleUnstableContainer(final ContainerInfo container,

}

/**
* Check and update Container key count and used bytes based on it's replica's
* data.
*/
private void checkAndUpdateContainerInfo(final ContainerInfo container,
final Set<ContainerReplica> replicas) {
// check container key count and bytes used
long maxUsedBytes = 0;
long maxKeyCount = 0;
ContainerReplica[] rps = replicas.toArray(new ContainerReplica[0]);
for (int i = 0; i < rps.length; i++) {
maxUsedBytes = Math.max(maxUsedBytes, rps[i].getBytesUsed());
maxKeyCount = Math.max(maxKeyCount, rps[i].getKeyCount());
}
if (maxKeyCount < container.getNumberOfKeys()) {
LOG.debug("Container {} key count changed from {} to {}",
container.containerID(), container.getNumberOfKeys(), maxKeyCount);
container.setNumberOfKeys(maxKeyCount);
}
if (maxUsedBytes < container.getUsedBytes()) {
LOG.debug("Container {} used bytes changed from {} to {}",
container.containerID(), container.getUsedBytes(), maxUsedBytes);
container.setUsedBytes(maxUsedBytes);
}
}

/**
* Sends close container command for the given container to the given
* datanode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
import org.mockito.Mockito;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static junit.framework.TestCase.assertEquals;
import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;

Expand Down Expand Up @@ -483,9 +485,167 @@ public void testQuasiClosedToClosed()
Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
}

@Test
public void openContainerKeyAndBytesUsedUpdatedToMinimumOfAllReplicas()
throws SCMException {
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, containerManager);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();

final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();

final ContainerReplicaProto.State replicaState
= ContainerReplicaProto.State.OPEN;
final ContainerInfo containerOne = getContainer(LifeCycleState.OPEN);

final Set<ContainerID> containerIDSet = new HashSet<>();
containerIDSet.add(containerOne.containerID());

containerStateManager.loadContainer(containerOne);
// Container loaded, no replicas reported from DNs. Expect zeros for
// usage values.
assertEquals(0L, containerOne.getUsedBytes());
assertEquals(0L, containerOne.getNumberOfKeys());

reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeOne, 50L, 60L), publisher);

// Single replica reported - ensure values are updated
assertEquals(50L, containerOne.getUsedBytes());
assertEquals(60L, containerOne.getNumberOfKeys());

reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeTwo, 50L, 60L), publisher);
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeThree, 50L, 60L), publisher);

// All 3 DNs are reporting the same values. Counts should be as expected.
assertEquals(50L, containerOne.getUsedBytes());
assertEquals(60L, containerOne.getNumberOfKeys());

// Now each DN reports a different lesser value. Counts should be the min
// reported.
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeOne, 1L, 10L), publisher);
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeTwo, 2L, 11L), publisher);
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeThree, 3L, 12L), publisher);

// All 3 DNs are reporting different values. The actual value should be the
// minimum.
assertEquals(1L, containerOne.getUsedBytes());
assertEquals(10L, containerOne.getNumberOfKeys());

// Have the lowest value report a higher value and ensure the new value
// is the minimum
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeOne, 3L, 12L), publisher);

assertEquals(2L, containerOne.getUsedBytes());
assertEquals(11L, containerOne.getNumberOfKeys());
}

@Test
public void notOpenContainerKeyAndBytesUsedUpdatedToMaximumOfAllReplicas()
throws SCMException {
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, containerManager);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();

final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();

final ContainerReplicaProto.State replicaState
= ContainerReplicaProto.State.CLOSED;
final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);

final Set<ContainerID> containerIDSet = new HashSet<>();
containerIDSet.add(containerOne.containerID());

containerStateManager.loadContainer(containerOne);
// Container loaded, no replicas reported from DNs. Expect zeros for
// usage values.
assertEquals(0L, containerOne.getUsedBytes());
assertEquals(0L, containerOne.getNumberOfKeys());

reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeOne, 50L, 60L), publisher);

// Single replica reported - ensure values are updated
assertEquals(50L, containerOne.getUsedBytes());
assertEquals(60L, containerOne.getNumberOfKeys());

reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeTwo, 50L, 60L), publisher);
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeThree, 50L, 60L), publisher);

// All 3 DNs are reporting the same values. Counts should be as expected.
assertEquals(50L, containerOne.getUsedBytes());
assertEquals(60L, containerOne.getNumberOfKeys());

// Now each DN reports a different lesser value. Counts should be the max
// reported.
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeOne, 1L, 10L), publisher);
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeTwo, 2L, 11L), publisher);
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeThree, 3L, 12L), publisher);

// All 3 DNs are reporting different values. The actual value should be the
// maximum.
assertEquals(3L, containerOne.getUsedBytes());
assertEquals(12L, containerOne.getNumberOfKeys());

// Have the highest value report a lower value and ensure the new value
// is the new maximumu
reportHandler.onMessage(getContainerReportFromDatanode(
containerOne.containerID(), replicaState,
datanodeThree, 1L, 10L), publisher);

assertEquals(2L, containerOne.getUsedBytes());
assertEquals(11L, containerOne.getNumberOfKeys());
}

private ContainerReportFromDatanode getContainerReportFromDatanode(
ContainerID containerId, ContainerReplicaProto.State state,
DatanodeDetails dn, long bytesUsed, long keyCount) {
ContainerReportsProto containerReport = getContainerReportsProto(
containerId, state, dn.getUuidString(), bytesUsed, keyCount);

return new ContainerReportFromDatanode(dn, containerReport);
}

private static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId) {
return getContainerReportsProto(containerId, state, originNodeId,
2000000000L, 100000000L);
}

private static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId, final long usedBytes, final long keyCount) {
final ContainerReportsProto.Builder crBuilder =
ContainerReportsProto.newBuilder();
final ContainerReplicaProto replicaProto =
Expand All @@ -495,8 +655,8 @@ private static ContainerReportsProto getContainerReportsProto(
.setOriginNodeId(originNodeId)
.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
.setSize(5368709120L)
.setUsed(2000000000L)
.setKeyCount(100000000L)
.setUsed(usedBytes)
.setKeyCount(keyCount)
.setReadCount(100000000L)
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,8 @@ public void testContainerStatistics() throws Exception {
});

om.deleteKey(keyArgs);
// Want for blocks to be deleted
// Wait for blocks to be deleted and container reports to be processed
Thread.sleep(5000);
scm.getReplicationManager().processContainersNow();
// Wait for container statistics change
Thread.sleep(1000);
containerInfos = scm.getContainerManager().getContainers();
containerInfos.stream().forEach(container -> {
Assert.assertEquals(0, container.getUsedBytes());
Expand Down