From e7eb1725ef27a4076102eab9ea5c693ed93c730e Mon Sep 17 00:00:00 2001 From: Siddhant Sangwan Date: Fri, 6 Jun 2025 15:58:09 +0530 Subject: [PATCH 1/3] HDDS-13045. Implement Immediate Triggering of Heartbeat when Volume Full --- .../container/common/impl/ContainerData.java | 7 ++++ .../container/common/impl/HddsDispatcher.java | 36 +++++++++++-------- .../container/common/volume/HddsVolume.java | 7 ++++ .../common/impl/TestHddsDispatcher.java | 23 ++++++++++-- 4 files changed, 55 insertions(+), 18 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index 7bb59247ca53..f7526cfdc10b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Optional; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto; @@ -59,6 +60,8 @@ public abstract class ContainerData { // For now, we support only KeyValueContainer. private final ContainerType containerType; + private final AtomicBoolean immediateCloseActionSent = new AtomicBoolean(false); + // Unique identifier for the container private final long containerID; @@ -171,6 +174,10 @@ public long getContainerID() { return containerID; } + public AtomicBoolean getImmediateCloseActionSent() { + return immediateCloseActionSent; + } + /** * Returns the path to base dir of the container. * @return Path to base dir. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index c4e03e453349..873e2bbc4a47 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; @@ -580,7 +581,9 @@ public void validateContainerCommand( */ private void sendCloseContainerActionIfNeeded(Container container) { // We have to find a more efficient way to close a container. - boolean isSpaceFull = isContainerFull(container) || isVolumeFull(container); + boolean isOpen = container != null && container.getContainerState() == State.OPEN; + boolean isVolumeFull = isOpen && isVolumeFullExcludingCommittedSpace(container); + boolean isSpaceFull = isContainerFull(container) || isVolumeFull; boolean shouldClose = isSpaceFull || isContainerUnhealthy(container); if (shouldClose) { ContainerData containerData = container.getContainerData(); @@ -591,6 +594,21 @@ private void sendCloseContainerActionIfNeeded(Container container) { .setContainerID(containerData.getContainerID()) .setAction(ContainerAction.Action.CLOSE).setReason(reason).build(); context.addContainerActionIfAbsent(action); + if (isVolumeFull) { + HddsVolume volume = containerData.getVolume(); + LOG.warn("Volume [{}] is full. containerID: {}. Volume usage: [{}].", volume, containerData.getContainerID(), + volume.getCurrentUsage()); + } + AtomicBoolean immediateCloseActionSent = containerData.getImmediateCloseActionSent(); + // if an immediate heartbeat has not been triggered already, trigger it now + if (immediateCloseActionSent.compareAndSet(false, true)) { + context.getParent().triggerHeartbeat(); + if (isVolumeFull) { + // log only if volume is full + // don't want to log if only container is full because that is expected to happen frequently + LOG.warn("Triggered immediate heartbeat because of full volume."); + } + } } } @@ -608,20 +626,8 @@ private boolean isContainerFull(Container container) { } } - private boolean isVolumeFull(Container container) { - boolean isOpen = Optional.ofNullable(container) - .map(cont -> cont.getContainerState() == ContainerDataProto.State.OPEN) - .orElse(Boolean.FALSE); - if (isOpen) { - HddsVolume volume = container.getContainerData().getVolume(); - StorageLocationReport volumeReport = volume.getReport(); - boolean full = volumeReport.getUsableSpace() <= 0; - if (full) { - LOG.info("Container {} volume is full: {}", container.getContainerData().getContainerID(), volumeReport); - } - return full; - } - return false; + private boolean isVolumeFullExcludingCommittedSpace(Container container) { + return container.getContainerData().getVolume().isVolumeFull(); } private boolean isContainerUnhealthy(Container container) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index cc9be3892bed..d728d26f13b2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature; import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions; import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB; @@ -194,6 +195,12 @@ public VolumeInfoMetrics getVolumeInfoStats() { return volumeInfoMetrics; } + public boolean isVolumeFull() { + SpaceUsageSource currentUsage = getCurrentUsage(); + // if the volume is failed, this method will implicitly return true because available space will be 0 + return currentUsage.getAvailable() - getFreeSpaceToSpare(currentUsage.getCapacity()) <= 0; + } + @Override protected StorageLocationReport.Builder reportBuilder() { StorageLocationReport.Builder builder = super.reportBuilder(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index 6afcadb809b9..2afe83da9e52 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -79,6 +79,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op; @@ -176,7 +177,15 @@ public void testContainerCloseActionWhenFull( responseTwo.getResult()); verify(context, times(1)) .addContainerActionIfAbsent(any(ContainerAction.class)); - + DatanodeStateMachine stateMachine = context.getParent(); + // immediate heartbeat should be triggered + verify(stateMachine, times(1)).triggerHeartbeat(); + + // if we write again, the container action should get added but heartbeat should not get triggered again because + // of throttling + hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 3L), null); + verify(context, times(2)).addContainerActionIfAbsent(any(ContainerAction.class)); + verify(stateMachine, times(1)).triggerHeartbeat(); // was called once before } finally { volumeSet.shutdown(); ContainerMetrics.remove(); @@ -276,6 +285,7 @@ public void testContainerCloseActionWhenVolumeFull( UUID scmId = UUID.randomUUID(); ContainerSet containerSet = newContainerSet(); StateContext context = ContainerTestUtils.getMockContext(dd, conf); + DatanodeStateMachine stateMachine = context.getParent(); // create a 50 byte container // available (160) > 100 (min free space) + 50 (container size) KeyValueContainerData containerData = new KeyValueContainerData(1L, @@ -300,14 +310,21 @@ public void testContainerCloseActionWhenVolumeFull( conf, containerSet, volumeSet, handlers, context, metrics, null); hddsDispatcher.setClusterId(scmId.toString()); containerData.getVolume().getVolumeUsage() - .ifPresent(usage -> usage.incrementUsedSpace(50)); - usedSpace.addAndGet(50); + .ifPresent(usage -> usage.incrementUsedSpace(60)); + usedSpace.addAndGet(60); ContainerCommandResponseProto response = hddsDispatcher .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null); assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); verify(context, times(1)) .addContainerActionIfAbsent(any(ContainerAction.class)); + // verify that immediate heartbeat is triggered + verify(stateMachine, times(1)).triggerHeartbeat(); + // the volume has reached the min free space boundary but this time the heartbeat should not be triggered because + // of throttling + hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null); + verify(context, times(2)).addContainerActionIfAbsent(any(ContainerAction.class)); + verify(stateMachine, times(1)).triggerHeartbeat(); // was called once before // try creating another container now as the volume used has crossed // threshold From be17bc8334b3d636efad9038807aee279175b4d4 Mon Sep 17 00:00:00 2001 From: Siddhant Sangwan Date: Mon, 9 Jun 2025 16:18:11 +0530 Subject: [PATCH 2/3] modify unit test to verify per container throttling --- .../common/impl/TestHddsDispatcher.java | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index 2afe83da9e52..edbd8fd2e298 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -126,6 +126,16 @@ public static void init() { volumeChoosingPolicy = VolumeChoosingPolicyFactory.getPolicy(new OzoneConfiguration()); } + /** + * Tests that close container action is sent when a container is full. First two containers are created. Then we + * write to one of them to confirm normal writes are successful. Then we increase the used space of both containers + * such that they're close to full, and write to both of them simultaneously. The expectation is that close + * container action should be added for both of them and two immediate heartbeats should be sent. Next, we write + * again to the first container. This time the close container action should be queued but immediate heartbeat + * should not be sent because of throttling. This confirms that the throttling is per container. + * @param layout + * @throws IOException + */ @ContainerLayoutTestInfo.ContainerTest public void testContainerCloseActionWhenFull( ContainerLayoutVersion layout) throws IOException { @@ -142,16 +152,23 @@ public void testContainerCloseActionWhenFull( UUID scmId = UUID.randomUUID(); ContainerSet containerSet = newContainerSet(); StateContext context = ContainerTestUtils.getMockContext(dd, conf); + // create both containers KeyValueContainerData containerData = new KeyValueContainerData(1L, layout, (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(), dd.getUuidString()); + KeyValueContainerData containerData2 = new KeyValueContainerData(2L, + layout, (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(), dd.getUuidString()); Container container = new KeyValueContainer(containerData, conf); + Container container2 = new KeyValueContainer(containerData2, conf); StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()) .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile())); container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), scmId.toString()); + container2.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), + scmId.toString()); containerSet.addContainer(container); + containerSet.addContainer(container2); ContainerMetrics metrics = ContainerMetrics.create(conf); Map handlers = Maps.newHashMap(); for (ContainerType containerType : ContainerType.values()) { @@ -160,6 +177,7 @@ public void testContainerCloseActionWhenFull( context.getParent().getDatanodeDetails().getUuidString(), containerSet, volumeSet, volumeChoosingPolicy, metrics, NO_OP_ICR_SENDER)); } + // write successfully to first container HddsDispatcher hddsDispatcher = new HddsDispatcher( conf, containerSet, volumeSet, handlers, context, metrics, null); hddsDispatcher.setClusterId(scmId.toString()); @@ -169,23 +187,29 @@ public void testContainerCloseActionWhenFull( responseOne.getResult()); verify(context, times(0)) .addContainerActionIfAbsent(any(ContainerAction.class)); + // increment used space of both containers containerData.setBytesUsed(Double.valueOf( StorageUnit.MB.toBytes(950)).longValue()); + containerData2.setBytesUsed(Double.valueOf(StorageUnit.MB.toBytes(950)).longValue()); ContainerCommandResponseProto responseTwo = hddsDispatcher .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null); + ContainerCommandResponseProto responseThree = hddsDispatcher + .dispatch(getWriteChunkRequest(dd.getUuidString(), 2L, 1L), null); assertEquals(ContainerProtos.Result.SUCCESS, responseTwo.getResult()); - verify(context, times(1)) + assertEquals(ContainerProtos.Result.SUCCESS, responseThree.getResult()); + // container action should be added for both containers + verify(context, times(2)) .addContainerActionIfAbsent(any(ContainerAction.class)); DatanodeStateMachine stateMachine = context.getParent(); - // immediate heartbeat should be triggered - verify(stateMachine, times(1)).triggerHeartbeat(); + // immediate heartbeat should be triggered for both the containers + verify(stateMachine, times(2)).triggerHeartbeat(); - // if we write again, the container action should get added but heartbeat should not get triggered again because - // of throttling + // if we write again to container 1, the container action should get added but heartbeat should not get triggered + // again because of throttling hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 3L), null); - verify(context, times(2)).addContainerActionIfAbsent(any(ContainerAction.class)); - verify(stateMachine, times(1)).triggerHeartbeat(); // was called once before + verify(context, times(3)).addContainerActionIfAbsent(any(ContainerAction.class)); + verify(stateMachine, times(2)).triggerHeartbeat(); // was called twice before } finally { volumeSet.shutdown(); ContainerMetrics.remove(); From e130f278f485d896f44d32f0ceb890c10cb108d9 Mon Sep 17 00:00:00 2001 From: Siddhant Sangwan Date: Fri, 13 Jun 2025 16:38:12 +0530 Subject: [PATCH 3/3] switch order of || --- .../hadoop/ozone/container/common/impl/HddsDispatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 873e2bbc4a47..6c35e30bcccc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -583,7 +583,7 @@ private void sendCloseContainerActionIfNeeded(Container container) { // We have to find a more efficient way to close a container. boolean isOpen = container != null && container.getContainerState() == State.OPEN; boolean isVolumeFull = isOpen && isVolumeFullExcludingCommittedSpace(container); - boolean isSpaceFull = isContainerFull(container) || isVolumeFull; + boolean isSpaceFull = isVolumeFull || isContainerFull(container); boolean shouldClose = isSpaceFull || isContainerUnhealthy(container); if (shouldClose) { ContainerData containerData = container.getContainerData();