Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
Expand All @@ -44,6 +45,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
Expand Down Expand Up @@ -110,6 +112,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private ContainerMetrics metrics;
private final TokenVerifier tokenVerifier;
private long slowOpThresholdNs;
private AtomicLong fullVolumeLastHeartbeatTriggerMs;
private long fullVolumeHeartbeatThrottleIntervalMs;

/**
* Constructs an OzoneContainer that receives calls from
Expand All @@ -130,6 +134,10 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet,
this.tokenVerifier = tokenVerifier != null ? tokenVerifier
: new NoopTokenVerifier();
this.slowOpThresholdNs = getSlowOpThresholdMs(conf) * 1000000;
fullVolumeLastHeartbeatTriggerMs = new AtomicLong(-1);
long heartbeatInterval =
config.getTimeDuration("hdds.heartbeat.interval", 30000, TimeUnit.MILLISECONDS);
Copy link
Contributor

@ChenSammi ChenSammi May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call HddsServerUtil#getScmHeartbeatInterval instead?

And there is HDDS_NODE_REPORT_INTERVAL for node report. Shall we use node report property instead of heartbeat property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HDDS_NODE_REPORT_INTERVAL is 1 minute, it may be too long?

Copy link
Contributor

@ChenSammi ChenSammi May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1m or 3s doesn't matter, because you always send out the first heartbeat immediately. This 1m is used to control the throttling, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's for throttling

fullVolumeHeartbeatThrottleIntervalMs = Math.min(heartbeatInterval, 30000);

protocolMetrics =
new ProtocolMessageMetrics<>(
Expand Down Expand Up @@ -335,7 +343,15 @@ && getMissingContainerSet().contains(containerID)) {
// Small performance optimization. We check if the operation is of type
// write before trying to send CloseContainerAction.
if (!HddsUtils.isReadOnly(msg)) {
sendCloseContainerActionIfNeeded(container);
boolean isFull = isVolumeFull(container);
sendCloseContainerActionIfNeeded(container, isFull);
if (isFull) {
try {
handleFullVolume(container.getContainerData().getVolume());
} catch (StorageContainerException e) {
ContainerUtils.logAndReturnError(LOG, e, msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to return here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, but I'm not sure. There was an exception in getting the node report, but does that mean we should fail the write? Maybe we should still let the write continue here. Otherwise because of an intermittent or not severe exception we could keep on failing writes. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK not return here, but instead of calling ContainerUtils.logAndReturnError, you can probably just log the failure message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test whether the logging is proper, I added a new test that throws an exception. Here's what the logs look like:

2025-05-30 16:01:08,027 [main] WARN  impl.HddsDispatcher (HddsDispatcher.java:dispatchRequest(354)) - Failed to handle full volume while handling request: cmdType: WriteChunk
containerID: 1
datanodeUuid: "c6842f19-cbc5-47ca-bce0-f5bc859ef807"
writeChunk {
  blockID {
    containerID: 1
    localID: 1
    blockCommitSequenceId: 0
  }
  chunkData {
    chunkName: "36b4d6b58215a7da96e3bf71a602e3ea_stream_1_chunk_1"
    offset: 0
    len: 36
    checksumData {
      type: NONE
      bytesPerChecksum: 0
    }
  }
  data: "b0bc4858-a308-417d-b363-0631e07b97ec"
}

org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException: Failed to create node report when handling full volume /var/folders/jp/39hcmgjx4yb_kry3ydxb3c7r0000gn/T/junit-110499014917526916. Volume Report: { id=DS-db481691-4055-404b-8790-f375e6d41215 dir=/var/folders/jp/39hcmgjx4yb_kry3ydxb3c7r0000gn/T/junit-110499014917526916/hdds type=DISK capacity=499 used=390 available=109 minFree=100 committed=50 }
	at org.apache.hadoop.ozone.container.common.impl.HddsDispatcher.handleFullVolume(HddsDispatcher.java:481)
	at org.apache.hadoop.ozone.container.common.impl.HddsDispatcher.dispatchRequest(HddsDispatcher.java:352)
	at org.apache.hadoop.ozone.container.common.impl.HddsDispatcher.lambda$dispatch$1(HddsDispatcher.java:199)
	at org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher.processRequest(OzoneProtocolMessageDispatcher.java:87)
	at org.apache.hadoop.ozone.container.common.impl.HddsDispatcher.dispatch(HddsDispatcher.java:198)
	at org.apache.hadoop.ozone.container.common.impl.TestHddsDispatcher.testExceptionHandlingWhenVolumeFull(TestHddsDispatcher.java:430)
...

}
}
}
Handler handler = getHandler(containerType);
if (handler == null) {
Expand Down Expand Up @@ -403,7 +419,7 @@ && getMissingContainerSet().contains(containerID)) {
// in any case, the in memory state of the container should be unhealthy
Preconditions.checkArgument(
container.getContainerData().getState() == State.UNHEALTHY);
sendCloseContainerActionIfNeeded(container);
sendCloseContainerActionIfNeeded(container, isVolumeFull(container));
}
if (cmdType == Type.CreateContainer
&& result == Result.SUCCESS && dispatcherContext != null) {
Expand Down Expand Up @@ -435,6 +451,37 @@ && getMissingContainerSet().contains(containerID)) {
}
}

/**
* If the volume is full, we need to inform SCM about the latest volume usage stats and send the close container
* action for this container immediately. {@link HddsDispatcher#sendCloseContainerActionIfNeeded(Container, boolean)}
* just adds the action to the heartbeat. Here, we get the latest storage statistics for this node, add them to the
* heartbeat, and then send the heartbeat (including container close action) immediately.
* @param volume the volume being written to
*/
private void handleFullVolume(HddsVolume volume) throws StorageContainerException {
long current = System.currentTimeMillis();
long last = fullVolumeLastHeartbeatTriggerMs.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider different volume gets full case , for example, P0, /data1 gets full, P1, /data2 gets full,
(P1-P0) < interval, do we expect two emergent container reports, or one report?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we will only send one report. I think this is fine because in the report we send info about all the volumes. However there's a discussion going on here #8460 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a good answer for this after thought for a while. The ideal state is if we want to send immediate heartbeat when one volume is full, we should respect each volume, send a heartbeat for each volume when it's full, but consider the complexity introduced to achieve that, I just doubt whether it's worthy to do that.

Because except the heartbeat sent here, there are regular node reports with storage info sent every 60s. If we only sent one report regardless of which volume, them probably we only need to sent the first one, and let the regular periodic node reports do the rest thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's stick to the current implementation then. I'll change the interval to node report interval instead of heartbeat interval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think purpose of sending full volume report is avoiding pipeline and container creation. Now node report is throttled and hence close container is throttled implicitly. Initial purpose was close container immediate to avoid new block allocation for the HB time (ie 30 second).

This may be similar to sending DN HB, only advantage here is for first failure within 1 min, its immediate, but all later failure is throttled.

for node report, there is a new configuration at SCM discovered to avoid new container allocation, "hdds.datanode.storage.utilization.critical.threshold". We need recheck overall target of problem to solve and optimize configuration / fix inconsistency.

cc: @ChenSammi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for node report, there is a new configuration at SCM discovered to avoid new container allocation, "hdds.datanode.storage.utilization.critical.threshold". We need recheck overall target of problem to solve and optimize configuration / fix inconsistency.

As discussed, this is dead code in Ozone and is not used anywhere.

boolean isFirstTrigger = last == -1;
boolean allowedToTrigger = (current - fullVolumeHeartbeatThrottleIntervalMs) >= last;
if (isFirstTrigger || allowedToTrigger) {
if (fullVolumeLastHeartbeatTriggerMs.compareAndSet(last, current)) {
StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReport;
try {
nodeReport = context.getParent().getContainer().getNodeReport();
context.refreshFullReport(nodeReport);
context.getParent().triggerHeartbeat();
LOG.info("Triggering heartbeat for full volume {}, with node report: {}.", volume, nodeReport);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is on the write path, so we must be extra careful about performance. An info log will reduce performance, but I wonder if it's ok in this case because this won't happen often? What do others think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover the future plan is to fail the write anyway if the size is exceeding the min free and reserved space boundary.

} catch (IOException e) {
String volumePath = volume.getVolumeRootDir();
StorageLocationReport volumeReport = volume.getReport();
String error = String.format(
"Failed to create node report when handling full volume %s. Volume Report: %s", volumePath, volumeReport);
throw new StorageContainerException(error, e, Result.IO_EXCEPTION);
}
}
}
}

private long getSlowOpThresholdMs(ConfigurationSource config) {
return config.getTimeDuration(
HddsConfigKeys.HDDS_DATANODE_SLOW_OP_WARNING_THRESHOLD_KEY,
Expand Down Expand Up @@ -578,9 +625,9 @@ public void validateContainerCommand(
* marked unhealthy we send Close ContainerAction to SCM.
* @param container current state of container
*/
private void sendCloseContainerActionIfNeeded(Container container) {
private void sendCloseContainerActionIfNeeded(Container container, boolean isVolumeFull) {
// We have to find a more efficient way to close a container.
boolean isSpaceFull = isContainerFull(container) || isVolumeFull(container);
boolean isSpaceFull = isContainerFull(container) || isVolumeFull;
boolean shouldClose = isSpaceFull || isContainerUnhealthy(container);
if (shouldClose) {
ContainerData containerData = container.getContainerData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -63,6 +64,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
Expand All @@ -79,6 +81,7 @@
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
Expand All @@ -93,6 +96,7 @@
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.security.token.Token;
import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -177,6 +181,9 @@ public void testContainerCloseActionWhenFull(
verify(context, times(1))
.addContainerActionIfAbsent(any(ContainerAction.class));

// since the volume is not full, context.refreshFullReport(NodeReportProto) should not be called
verify(context, times(0)).refreshFullReport(any());

} finally {
volumeSet.shutdown();
ContainerMetrics.remove();
Expand Down Expand Up @@ -276,6 +283,16 @@ public void testContainerCloseActionWhenVolumeFull(
UUID scmId = UUID.randomUUID();
ContainerSet containerSet = newContainerSet();
StateContext context = ContainerTestUtils.getMockContext(dd, conf);

// empty report object for testing that an immediate heartbeat is triggered
StorageContainerDatanodeProtocolProtos.NodeReportProto.Builder nrb
= StorageContainerDatanodeProtocolProtos.
NodeReportProto.newBuilder();
StorageContainerDatanodeProtocolProtos.NodeReportProto reportProto = nrb.build();
DatanodeStateMachine stateMachine = context.getParent();
OzoneContainer ozoneContainer = mock(OzoneContainer.class);
doReturn(ozoneContainer).when(stateMachine).getContainer();
doReturn(reportProto).when(ozoneContainer).getNodeReport();
// create a 50 byte container
// available (160) > 100 (min free space) + 50 (container size)
KeyValueContainerData containerData = new KeyValueContainerData(1L,
Expand Down Expand Up @@ -308,6 +325,15 @@ public void testContainerCloseActionWhenVolumeFull(
response.getResult());
verify(context, times(1))
.addContainerActionIfAbsent(any(ContainerAction.class));
// verify that node report is refreshed and heartbeat is triggered
verify(context, times(1)).refreshFullReport(eq(reportProto));
verify(stateMachine, times(1)).triggerHeartbeat();

// the volume is past the min free space boundary but this time the heartbeat should not be triggered because
// of throttling
hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null);
verify(context, times(1)).refreshFullReport(eq(reportProto)); // was called once before
verify(stateMachine, times(1)).triggerHeartbeat(); // was called once before

// try creating another container now as the volume used has crossed
// threshold
Expand Down