Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.hadoop.ozone.container.common.impl;

import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
import static org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder;
Expand All @@ -32,6 +34,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
Expand All @@ -45,6 +48,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
Expand Down Expand Up @@ -110,6 +114,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private ContainerMetrics metrics;
private final TokenVerifier tokenVerifier;
private long slowOpThresholdNs;
private AtomicLong fullVolumeLastHeartbeatTriggerMs;
private long fullVolumeHeartbeatThrottleIntervalMs;

/**
* Constructs an OzoneContainer that receives calls from
Expand All @@ -130,6 +136,10 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet,
this.tokenVerifier = tokenVerifier != null ? tokenVerifier
: new NoopTokenVerifier();
this.slowOpThresholdNs = getSlowOpThresholdMs(conf) * 1000000;
fullVolumeLastHeartbeatTriggerMs = new AtomicLong(-1);
long nodeReportInterval = conf.getTimeDuration(HDDS_NODE_REPORT_INTERVAL, HDDS_NODE_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
fullVolumeHeartbeatThrottleIntervalMs = Math.min(nodeReportInterval, 60000); // min of interval and 1 minute

protocolMetrics =
new ProtocolMessageMetrics<>(
Expand Down Expand Up @@ -335,7 +345,11 @@ && getMissingContainerSet().contains(containerID)) {
// Small performance optimization. We check if the operation is of type
// write before trying to send CloseContainerAction.
if (!HddsUtils.isReadOnly(msg)) {
sendCloseContainerActionIfNeeded(container);
boolean isFull = isVolumeFull(container);
sendCloseContainerActionIfNeeded(container, isFull);
if (isFull) {
handleFullVolume(container.getContainerData().getVolume(), msg);
}
}
Handler handler = getHandler(containerType);
if (handler == null) {
Expand Down Expand Up @@ -403,7 +417,7 @@ && getMissingContainerSet().contains(containerID)) {
// in any case, the in memory state of the container should be unhealthy
Preconditions.checkArgument(
container.getContainerData().getState() == State.UNHEALTHY);
sendCloseContainerActionIfNeeded(container);
sendCloseContainerActionIfNeeded(container, isVolumeFull(container));
}
if (cmdType == Type.CreateContainer
&& result == Result.SUCCESS && dispatcherContext != null) {
Expand Down Expand Up @@ -435,6 +449,40 @@ && getMissingContainerSet().contains(containerID)) {
}
}

/**
* If the volume is full, we need to inform SCM about the latest volume usage stats and send the close container
* action for this container immediately. {@link HddsDispatcher#sendCloseContainerActionIfNeeded(Container, boolean)}
* just adds the action to the heartbeat. Here, we get the latest storage statistics for this node, add them to the
* heartbeat, and then send the heartbeat (including container close action) immediately.
* @param volume the volume being written to
*/
private void handleFullVolume(HddsVolume volume, ContainerCommandRequestProto request) {
long current = System.currentTimeMillis();
long last = fullVolumeLastHeartbeatTriggerMs.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider different volume gets full case , for example, P0, /data1 gets full, P1, /data2 gets full,
(P1-P0) < interval, do we expect two emergent container reports, or one report?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we will only send one report. I think this is fine because in the report we send info about all the volumes. However there's a discussion going on here #8460 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a good answer for this after thought for a while. The ideal state is if we want to send immediate heartbeat when one volume is full, we should respect each volume, send a heartbeat for each volume when it's full, but consider the complexity introduced to achieve that, I just doubt whether it's worthy to do that.

Because except the heartbeat sent here, there are regular node reports with storage info sent every 60s. If we only sent one report regardless of which volume, them probably we only need to sent the first one, and let the regular periodic node reports do the rest thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's stick to the current implementation then. I'll change the interval to node report interval instead of heartbeat interval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think purpose of sending full volume report is avoiding pipeline and container creation. Now node report is throttled and hence close container is throttled implicitly. Initial purpose was close container immediate to avoid new block allocation for the HB time (ie 30 second).

This may be similar to sending DN HB, only advantage here is for first failure within 1 min, its immediate, but all later failure is throttled.

for node report, there is a new configuration at SCM discovered to avoid new container allocation, "hdds.datanode.storage.utilization.critical.threshold". We need recheck overall target of problem to solve and optimize configuration / fix inconsistency.

cc: @ChenSammi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for node report, there is a new configuration at SCM discovered to avoid new container allocation, "hdds.datanode.storage.utilization.critical.threshold". We need recheck overall target of problem to solve and optimize configuration / fix inconsistency.

As discussed, this is dead code in Ozone and is not used anywhere.

boolean isFirstTrigger = last == -1;
boolean allowedToTrigger = (current - fullVolumeHeartbeatThrottleIntervalMs) >= last;
if (isFirstTrigger || allowedToTrigger) {
if (fullVolumeLastHeartbeatTriggerMs.compareAndSet(last, current)) {
try {
NodeReportProto nodeReport = triggerHeartbeatWithNodeReport();
LOG.info("Triggered heartbeat for full volume {}, with node report: {}.", volume, nodeReport);
} catch (IOException e) {
String volumePath = volume.getVolumeRootDir();
StorageLocationReport volumeReport = volume.getReport();
LOG.warn("Failed to create node report when handling full volume at path {} for request {}. Volume Report:" +
" {}", volumePath, request, volumeReport, e);
}
}
}
}

private NodeReportProto triggerHeartbeatWithNodeReport() throws IOException {
NodeReportProto nodeReport = context.getParent().getContainer().getNodeReport();
context.refreshFullReport(nodeReport);
context.getParent().triggerHeartbeat();
return nodeReport;
}

private long getSlowOpThresholdMs(ConfigurationSource config) {
return config.getTimeDuration(
HddsConfigKeys.HDDS_DATANODE_SLOW_OP_WARNING_THRESHOLD_KEY,
Expand Down Expand Up @@ -578,9 +626,9 @@ public void validateContainerCommand(
* marked unhealthy we send Close ContainerAction to SCM.
* @param container current state of container
*/
private void sendCloseContainerActionIfNeeded(Container container) {
private void sendCloseContainerActionIfNeeded(Container container, boolean isVolumeFull) {
// We have to find a more efficient way to close a container.
boolean isSpaceFull = isContainerFull(container) || isVolumeFull(container);
boolean isSpaceFull = isContainerFull(container) || isVolumeFull;
boolean shouldClose = isSpaceFull || isContainerUnhealthy(container);
if (shouldClose) {
ContainerData containerData = container.getContainerData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -63,6 +65,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.security.token.TokenVerifier;
Expand All @@ -79,6 +82,7 @@
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
Expand All @@ -93,6 +97,7 @@
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.security.token.Token;
import org.apache.ozone.test.GenericTestUtils.LogCapturer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -177,6 +182,9 @@ public void testContainerCloseActionWhenFull(
verify(context, times(1))
.addContainerActionIfAbsent(any(ContainerAction.class));

// since the volume is not full, context.refreshFullReport(NodeReportProto) should not be called
verify(context, times(0)).refreshFullReport(any());

} finally {
volumeSet.shutdown();
ContainerMetrics.remove();
Expand Down Expand Up @@ -276,6 +284,16 @@ public void testContainerCloseActionWhenVolumeFull(
UUID scmId = UUID.randomUUID();
ContainerSet containerSet = newContainerSet();
StateContext context = ContainerTestUtils.getMockContext(dd, conf);

// empty report object for testing that an immediate heartbeat is triggered
StorageContainerDatanodeProtocolProtos.NodeReportProto.Builder nrb
= StorageContainerDatanodeProtocolProtos.
NodeReportProto.newBuilder();
StorageContainerDatanodeProtocolProtos.NodeReportProto reportProto = nrb.build();
DatanodeStateMachine stateMachine = context.getParent();
OzoneContainer ozoneContainer = mock(OzoneContainer.class);
doReturn(ozoneContainer).when(stateMachine).getContainer();
doReturn(reportProto).when(ozoneContainer).getNodeReport();
// create a 50 byte container
// available (160) > 100 (min free space) + 50 (container size)
KeyValueContainerData containerData = new KeyValueContainerData(1L,
Expand Down Expand Up @@ -308,6 +326,15 @@ public void testContainerCloseActionWhenVolumeFull(
response.getResult());
verify(context, times(1))
.addContainerActionIfAbsent(any(ContainerAction.class));
// verify that node report is refreshed and heartbeat is triggered
verify(context, times(1)).refreshFullReport(eq(reportProto));
verify(stateMachine, times(1)).triggerHeartbeat();

// the volume is past the min free space boundary but this time the heartbeat should not be triggered because
// of throttling
hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null);
verify(context, times(1)).refreshFullReport(eq(reportProto)); // was called once before
verify(stateMachine, times(1)).triggerHeartbeat(); // was called once before

// try creating another container now as the volume used has crossed
// threshold
Expand All @@ -329,6 +356,95 @@ public void testContainerCloseActionWhenVolumeFull(
}
}

/**
* Tests that we log any exception properly along with volume and request details when handling a full volume.
*/
@ContainerLayoutTestInfo.ContainerTest
public void testExceptionHandlingWhenVolumeFull(ContainerLayoutVersion layoutVersion) throws IOException {
/*
SETTING UP FULL VOLUME SCENARIO AND MOCKS, SAME AS OTHER TESTS
*/
String testDirPath = testDir.getPath();
OzoneConfiguration conf = new OzoneConfiguration();
conf.setStorageSize(DatanodeConfiguration.HDDS_DATANODE_VOLUME_MIN_FREE_SPACE,
100.0, StorageUnit.BYTES);
DatanodeDetails dd = randomDatanodeDetails();

HddsVolume.Builder volumeBuilder =
new HddsVolume.Builder(testDirPath).datanodeUuid(dd.getUuidString())
.conf(conf).usageCheckFactory(MockSpaceUsageCheckFactory.NONE);
// state of cluster : available (160) > 100 ,datanode volume
// utilisation threshold not yet reached. container creates are successful.
AtomicLong usedSpace = new AtomicLong(340);
SpaceUsageSource spaceUsage = MockSpaceUsageSource.of(500, usedSpace);

SpaceUsageCheckFactory factory = MockSpaceUsageCheckFactory.of(
spaceUsage, Duration.ZERO, inMemory(new AtomicLong(0)));
volumeBuilder.usageCheckFactory(factory);
MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
when(volumeSet.getVolumesList())
.thenReturn(Collections.singletonList(volumeBuilder.build()));
try {
UUID scmId = UUID.randomUUID();
ContainerSet containerSet = newContainerSet();
StateContext context = ContainerTestUtils.getMockContext(dd, conf);

/*
MOCK TO THROW AN EXCEPTION WHEN getNodeReport() IS CALLED
*/
DatanodeStateMachine stateMachine = context.getParent();
OzoneContainer ozoneContainer = mock(OzoneContainer.class);
doReturn(ozoneContainer).when(stateMachine).getContainer();
doThrow(new IOException()).when(ozoneContainer).getNodeReport();
// create a 50 byte container
// available (160) > 100 (min free space) + 50 (container size)
KeyValueContainerData containerData = new KeyValueContainerData(1L,
layoutVersion,
50, UUID.randomUUID().toString(),
dd.getUuidString());
Container container = new KeyValueContainer(containerData, conf);
StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())
.forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile()));
container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(),
scmId.toString());
containerSet.addContainer(container);
ContainerMetrics metrics = ContainerMetrics.create(conf);
Map<ContainerType, Handler> handlers = Maps.newHashMap();
for (ContainerType containerType : ContainerType.values()) {
handlers.put(containerType,
Handler.getHandlerForContainerType(containerType, conf,
context.getParent().getDatanodeDetails().getUuidString(),
containerSet, volumeSet, volumeChoosingPolicy, metrics, NO_OP_ICR_SENDER));
}
HddsDispatcher hddsDispatcher = new HddsDispatcher(
conf, containerSet, volumeSet, handlers, context, metrics, null);
hddsDispatcher.setClusterId(scmId.toString());
/*
CAPTURE LOGS TO ASSERT THAT THE EXCEPTION WAS LOGGED PROPERLY
*/
LogCapturer logCapturer = LogCapturer.captureLogs(HddsDispatcher.LOG);
containerData.getVolume().getVolumeUsage()
.ifPresent(usage -> usage.incrementUsedSpace(50));
usedSpace.addAndGet(50);
ContainerCommandResponseProto response = hddsDispatcher
.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null);
logCapturer.stopCapturing();

assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
verify(context, times(1))
.addContainerActionIfAbsent(any(ContainerAction.class));
/*
getNodeReport() SHOULD BE CALLED, AND LOG CAPTURE SHOULD CONTAIN THE EXCEPTION
*/
verify(ozoneContainer, times(1)).getNodeReport();
assertTrue(logCapturer.getOutput().contains("Failed to create node report when handling full volume"));
} finally {
volumeSet.shutdown();
ContainerMetrics.remove();
}
}

@Test
public void testCreateContainerWithWriteChunk() throws IOException {
String testDirPath = testDir.getPath();
Expand Down