diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index 1ef5ac23d83a..ce0d17477ec7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -42,6 +42,7 @@ import java.util.Objects; import java.util.Optional; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; @@ -61,6 +62,8 @@ public abstract class ContainerData { // For now, we support only KeyValueContainer. private final ContainerType containerType; + private final AtomicBoolean immediateCloseActionSent = new AtomicBoolean(false); + // Unique identifier for the container private final long containerID; @@ -162,6 +165,10 @@ public long getContainerID() { return containerID; } + public AtomicBoolean getImmediateCloseActionSent() { + return immediateCloseActionSent; + } + /** * Returns the path to base dir of the container. * @return Path to base dir. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index c4e03e453349..6c35e30bcccc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; @@ -580,7 +581,9 @@ public void validateContainerCommand( */ private void sendCloseContainerActionIfNeeded(Container container) { // We have to find a more efficient way to close a container. - boolean isSpaceFull = isContainerFull(container) || isVolumeFull(container); + boolean isOpen = container != null && container.getContainerState() == State.OPEN; + boolean isVolumeFull = isOpen && isVolumeFullExcludingCommittedSpace(container); + boolean isSpaceFull = isVolumeFull || isContainerFull(container); boolean shouldClose = isSpaceFull || isContainerUnhealthy(container); if (shouldClose) { ContainerData containerData = container.getContainerData(); @@ -591,6 +594,21 @@ private void sendCloseContainerActionIfNeeded(Container container) { .setContainerID(containerData.getContainerID()) .setAction(ContainerAction.Action.CLOSE).setReason(reason).build(); context.addContainerActionIfAbsent(action); + if (isVolumeFull) { + HddsVolume volume = containerData.getVolume(); + LOG.warn("Volume [{}] is full. containerID: {}. Volume usage: [{}].", volume, containerData.getContainerID(), + volume.getCurrentUsage()); + } + AtomicBoolean immediateCloseActionSent = containerData.getImmediateCloseActionSent(); + // if an immediate heartbeat has not been triggered already, trigger it now + if (immediateCloseActionSent.compareAndSet(false, true)) { + context.getParent().triggerHeartbeat(); + if (isVolumeFull) { + // log only if volume is full + // don't want to log if only container is full because that is expected to happen frequently + LOG.warn("Triggered immediate heartbeat because of full volume."); + } + } } } @@ -608,20 +626,8 @@ private boolean isContainerFull(Container container) { } } - private boolean isVolumeFull(Container container) { - boolean isOpen = Optional.ofNullable(container) - .map(cont -> cont.getContainerState() == ContainerDataProto.State.OPEN) - .orElse(Boolean.FALSE); - if (isOpen) { - HddsVolume volume = container.getContainerData().getVolume(); - StorageLocationReport volumeReport = volume.getReport(); - boolean full = volumeReport.getUsableSpace() <= 0; - if (full) { - LOG.info("Container {} volume is full: {}", container.getContainerData().getContainerID(), volumeReport); - } - return full; - } - return false; + private boolean isVolumeFullExcludingCommittedSpace(Container container) { + return container.getContainerData().getVolume().isVolumeFull(); } private boolean isContainerUnhealthy(Container container) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index c34a443d15c0..ca82a61d134d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -195,6 +195,12 @@ public VolumeInfoMetrics getVolumeInfoStats() { return volumeInfoMetrics; } + public boolean isVolumeFull() { + SpaceUsageSource currentUsage = getCurrentUsage(); + // if the volume is failed, this method will implicitly return true because available space will be 0 + return currentUsage.getAvailable() - getFreeSpaceToSpare(currentUsage.getCapacity()) <= 0; + } + @Override protected StorageLocationReport.Builder reportBuilder() { StorageLocationReport.Builder builder = super.reportBuilder(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index d209adbb4340..1451a14f0f87 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -79,6 +79,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op; @@ -125,6 +126,16 @@ public static void init() { volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(new OzoneConfiguration()); } + /** + * Tests that close container action is sent when a container is full. First two containers are created. Then we + * write to one of them to confirm normal writes are successful. Then we increase the used space of both containers + * such that they're close to full, and write to both of them simultaneously. The expectation is that close + * container action should be added for both of them and two immediate heartbeats should be sent. Next, we write + * again to the first container. This time the close container action should be queued but immediate heartbeat + * should not be sent because of throttling. This confirms that the throttling is per container. + * @param layout + * @throws IOException + */ @ContainerLayoutTestInfo.ContainerTest public void testContainerCloseActionWhenFull( ContainerLayoutVersion layout) throws IOException { @@ -141,16 +152,23 @@ public void testContainerCloseActionWhenFull( UUID scmId = UUID.randomUUID(); ContainerSet containerSet = newContainerSet(); StateContext context = ContainerTestUtils.getMockContext(dd, conf); + // create both containers KeyValueContainerData containerData = new KeyValueContainerData(1L, layout, (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(), dd.getUuidString()); + KeyValueContainerData containerData2 = new KeyValueContainerData(2L, + layout, (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(), dd.getUuidString()); Container container = new KeyValueContainer(containerData, conf); + Container container2 = new KeyValueContainer(containerData2, conf); StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()) .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile())); container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), scmId.toString()); + container2.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), + scmId.toString()); containerSet.addContainer(container); + containerSet.addContainer(container2); ContainerMetrics metrics = ContainerMetrics.create(conf); Map handlers = Maps.newHashMap(); for (ContainerType containerType : ContainerType.values()) { @@ -159,6 +177,7 @@ public void testContainerCloseActionWhenFull( context.getParent().getDatanodeDetails().getUuidString(), containerSet, volumeSet, volumeChoosingPolicy, metrics, NO_OP_ICR_SENDER)); } + // write successfully to first container HddsDispatcher hddsDispatcher = new HddsDispatcher( conf, containerSet, volumeSet, handlers, context, metrics, null); hddsDispatcher.setClusterId(scmId.toString()); @@ -168,15 +187,30 @@ public void testContainerCloseActionWhenFull( responseOne.getResult()); verify(context, times(0)) .addContainerActionIfAbsent(any(ContainerAction.class)); + // increment used space of both containers containerData.getStatistics().setBlockBytesForTesting(Double.valueOf( StorageUnit.MB.toBytes(950)).longValue()); + containerData2.getStatistics().setBlockBytesForTesting(Double.valueOf( + StorageUnit.MB.toBytes(950)).longValue()); ContainerCommandResponseProto responseTwo = hddsDispatcher .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null); + ContainerCommandResponseProto responseThree = hddsDispatcher + .dispatch(getWriteChunkRequest(dd.getUuidString(), 2L, 1L), null); assertEquals(ContainerProtos.Result.SUCCESS, responseTwo.getResult()); - verify(context, times(1)) + assertEquals(ContainerProtos.Result.SUCCESS, responseThree.getResult()); + // container action should be added for both containers + verify(context, times(2)) .addContainerActionIfAbsent(any(ContainerAction.class)); - + DatanodeStateMachine stateMachine = context.getParent(); + // immediate heartbeat should be triggered for both the containers + verify(stateMachine, times(2)).triggerHeartbeat(); + + // if we write again to container 1, the container action should get added but heartbeat should not get triggered + // again because of throttling + hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 3L), null); + verify(context, times(3)).addContainerActionIfAbsent(any(ContainerAction.class)); + verify(stateMachine, times(2)).triggerHeartbeat(); // was called twice before } finally { volumeSet.shutdown(); ContainerMetrics.remove(); @@ -276,6 +310,7 @@ public void testContainerCloseActionWhenVolumeFull( UUID scmId = UUID.randomUUID(); ContainerSet containerSet = newContainerSet(); StateContext context = ContainerTestUtils.getMockContext(dd, conf); + DatanodeStateMachine stateMachine = context.getParent(); // create a 50 byte container // available (160) > 100 (min free space) + 50 (container size) KeyValueContainerData containerData = new KeyValueContainerData(1L, @@ -300,14 +335,21 @@ public void testContainerCloseActionWhenVolumeFull( conf, containerSet, volumeSet, handlers, context, metrics, null); hddsDispatcher.setClusterId(scmId.toString()); containerData.getVolume().getVolumeUsage() - .ifPresent(usage -> usage.incrementUsedSpace(50)); - usedSpace.addAndGet(50); + .ifPresent(usage -> usage.incrementUsedSpace(60)); + usedSpace.addAndGet(60); ContainerCommandResponseProto response = hddsDispatcher .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null); assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); verify(context, times(1)) .addContainerActionIfAbsent(any(ContainerAction.class)); + // verify that immediate heartbeat is triggered + verify(stateMachine, times(1)).triggerHeartbeat(); + // the volume has reached the min free space boundary but this time the heartbeat should not be triggered because + // of throttling + hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null); + verify(context, times(2)).addContainerActionIfAbsent(any(ContainerAction.class)); + verify(stateMachine, times(1)).triggerHeartbeat(); // was called once before // try creating another container now as the volume used has crossed // threshold