diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index c4e03e453349..4eb945188873 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.container.common.impl; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL_DEFAULT; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest; import static org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder; @@ -32,6 +34,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.client.BlockID; @@ -45,6 +48,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -110,6 +114,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor { private ContainerMetrics metrics; private final TokenVerifier tokenVerifier; private long slowOpThresholdNs; + private AtomicLong fullVolumeLastHeartbeatTriggerMs; + private long fullVolumeHeartbeatThrottleIntervalMs; /** * Constructs an OzoneContainer that receives calls from @@ -130,6 +136,10 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet, this.tokenVerifier = tokenVerifier != null ? tokenVerifier : new NoopTokenVerifier(); this.slowOpThresholdNs = getSlowOpThresholdMs(conf) * 1000000; + fullVolumeLastHeartbeatTriggerMs = new AtomicLong(-1); + long nodeReportInterval = conf.getTimeDuration(HDDS_NODE_REPORT_INTERVAL, HDDS_NODE_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + fullVolumeHeartbeatThrottleIntervalMs = Math.min(nodeReportInterval, 60000); // min of interval and 1 minute protocolMetrics = new ProtocolMessageMetrics<>( @@ -335,7 +345,11 @@ && getMissingContainerSet().contains(containerID)) { // Small performance optimization. We check if the operation is of type // write before trying to send CloseContainerAction. if (!HddsUtils.isReadOnly(msg)) { - sendCloseContainerActionIfNeeded(container); + boolean isFull = isVolumeFull(container); + sendCloseContainerActionIfNeeded(container, isFull); + if (isFull) { + handleFullVolume(container.getContainerData().getVolume(), msg); + } } Handler handler = getHandler(containerType); if (handler == null) { @@ -403,7 +417,7 @@ && getMissingContainerSet().contains(containerID)) { // in any case, the in memory state of the container should be unhealthy Preconditions.checkArgument( container.getContainerData().getState() == State.UNHEALTHY); - sendCloseContainerActionIfNeeded(container); + sendCloseContainerActionIfNeeded(container, isVolumeFull(container)); } if (cmdType == Type.CreateContainer && result == Result.SUCCESS && dispatcherContext != null) { @@ -435,6 +449,40 @@ && getMissingContainerSet().contains(containerID)) { } } + /** + * If the volume is full, we need to inform SCM about the latest volume usage stats and send the close container + * action for this container immediately. {@link HddsDispatcher#sendCloseContainerActionIfNeeded(Container, boolean)} + * just adds the action to the heartbeat. Here, we get the latest storage statistics for this node, add them to the + * heartbeat, and then send the heartbeat (including container close action) immediately. + * @param volume the volume being written to + */ + private void handleFullVolume(HddsVolume volume, ContainerCommandRequestProto request) { + long current = System.currentTimeMillis(); + long last = fullVolumeLastHeartbeatTriggerMs.get(); + boolean isFirstTrigger = last == -1; + boolean allowedToTrigger = (current - fullVolumeHeartbeatThrottleIntervalMs) >= last; + if (isFirstTrigger || allowedToTrigger) { + if (fullVolumeLastHeartbeatTriggerMs.compareAndSet(last, current)) { + try { + NodeReportProto nodeReport = triggerHeartbeatWithNodeReport(); + LOG.info("Triggered heartbeat for full volume {}, with node report: {}.", volume, nodeReport); + } catch (IOException e) { + String volumePath = volume.getVolumeRootDir(); + StorageLocationReport volumeReport = volume.getReport(); + LOG.warn("Failed to create node report when handling full volume at path {} for request {}. Volume Report:" + + " {}", volumePath, request, volumeReport, e); + } + } + } + } + + private NodeReportProto triggerHeartbeatWithNodeReport() throws IOException { + NodeReportProto nodeReport = context.getParent().getContainer().getNodeReport(); + context.refreshFullReport(nodeReport); + context.getParent().triggerHeartbeat(); + return nodeReport; + } + private long getSlowOpThresholdMs(ConfigurationSource config) { return config.getTimeDuration( HddsConfigKeys.HDDS_DATANODE_SLOW_OP_WARNING_THRESHOLD_KEY, @@ -578,9 +626,9 @@ public void validateContainerCommand( * marked unhealthy we send Close ContainerAction to SCM. * @param container current state of container */ - private void sendCloseContainerActionIfNeeded(Container container) { + private void sendCloseContainerActionIfNeeded(Container container, boolean isVolumeFull) { // We have to find a more efficient way to close a container. - boolean isSpaceFull = isContainerFull(container) || isVolumeFull(container); + boolean isSpaceFull = isContainerFull(container) || isVolumeFull; boolean shouldClose = isSpaceFull || isContainerUnhealthy(container); if (shouldClose) { ContainerData containerData = container.getContainerData(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index 6afcadb809b9..408f8fb771d0 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -28,8 +28,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -63,6 +65,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.security.token.TokenVerifier; @@ -79,6 +82,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op; @@ -93,6 +97,7 @@ import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.security.token.Token; import org.apache.ozone.test.GenericTestUtils.LogCapturer; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -177,6 +182,9 @@ public void testContainerCloseActionWhenFull( verify(context, times(1)) .addContainerActionIfAbsent(any(ContainerAction.class)); + // since the volume is not full, context.refreshFullReport(NodeReportProto) should not be called + verify(context, times(0)).refreshFullReport(any()); + } finally { volumeSet.shutdown(); ContainerMetrics.remove(); @@ -276,6 +284,16 @@ public void testContainerCloseActionWhenVolumeFull( UUID scmId = UUID.randomUUID(); ContainerSet containerSet = newContainerSet(); StateContext context = ContainerTestUtils.getMockContext(dd, conf); + + // empty report object for testing that an immediate heartbeat is triggered + StorageContainerDatanodeProtocolProtos.NodeReportProto.Builder nrb + = StorageContainerDatanodeProtocolProtos. + NodeReportProto.newBuilder(); + StorageContainerDatanodeProtocolProtos.NodeReportProto reportProto = nrb.build(); + DatanodeStateMachine stateMachine = context.getParent(); + OzoneContainer ozoneContainer = mock(OzoneContainer.class); + doReturn(ozoneContainer).when(stateMachine).getContainer(); + doReturn(reportProto).when(ozoneContainer).getNodeReport(); // create a 50 byte container // available (160) > 100 (min free space) + 50 (container size) KeyValueContainerData containerData = new KeyValueContainerData(1L, @@ -308,6 +326,15 @@ public void testContainerCloseActionWhenVolumeFull( response.getResult()); verify(context, times(1)) .addContainerActionIfAbsent(any(ContainerAction.class)); + // verify that node report is refreshed and heartbeat is triggered + verify(context, times(1)).refreshFullReport(eq(reportProto)); + verify(stateMachine, times(1)).triggerHeartbeat(); + + // the volume is past the min free space boundary but this time the heartbeat should not be triggered because + // of throttling + hddsDispatcher.dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null); + verify(context, times(1)).refreshFullReport(eq(reportProto)); // was called once before + verify(stateMachine, times(1)).triggerHeartbeat(); // was called once before // try creating another container now as the volume used has crossed // threshold @@ -329,6 +356,95 @@ public void testContainerCloseActionWhenVolumeFull( } } + /** + * Tests that we log any exception properly along with volume and request details when handling a full volume. + */ + @ContainerLayoutTestInfo.ContainerTest + public void testExceptionHandlingWhenVolumeFull(ContainerLayoutVersion layoutVersion) throws IOException { + /* + SETTING UP FULL VOLUME SCENARIO AND MOCKS, SAME AS OTHER TESTS + */ + String testDirPath = testDir.getPath(); + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setStorageSize(DatanodeConfiguration.HDDS_DATANODE_VOLUME_MIN_FREE_SPACE, + 100.0, StorageUnit.BYTES); + DatanodeDetails dd = randomDatanodeDetails(); + + HddsVolume.Builder volumeBuilder = + new HddsVolume.Builder(testDirPath).datanodeUuid(dd.getUuidString()) + .conf(conf).usageCheckFactory(MockSpaceUsageCheckFactory.NONE); + // state of cluster : available (160) > 100 ,datanode volume + // utilisation threshold not yet reached. container creates are successful. + AtomicLong usedSpace = new AtomicLong(340); + SpaceUsageSource spaceUsage = MockSpaceUsageSource.of(500, usedSpace); + + SpaceUsageCheckFactory factory = MockSpaceUsageCheckFactory.of( + spaceUsage, Duration.ZERO, inMemory(new AtomicLong(0))); + volumeBuilder.usageCheckFactory(factory); + MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + when(volumeSet.getVolumesList()) + .thenReturn(Collections.singletonList(volumeBuilder.build())); + try { + UUID scmId = UUID.randomUUID(); + ContainerSet containerSet = newContainerSet(); + StateContext context = ContainerTestUtils.getMockContext(dd, conf); + + /* + MOCK TO THROW AN EXCEPTION WHEN getNodeReport() IS CALLED + */ + DatanodeStateMachine stateMachine = context.getParent(); + OzoneContainer ozoneContainer = mock(OzoneContainer.class); + doReturn(ozoneContainer).when(stateMachine).getContainer(); + doThrow(new IOException()).when(ozoneContainer).getNodeReport(); + // create a 50 byte container + // available (160) > 100 (min free space) + 50 (container size) + KeyValueContainerData containerData = new KeyValueContainerData(1L, + layoutVersion, + 50, UUID.randomUUID().toString(), + dd.getUuidString()); + Container container = new KeyValueContainer(containerData, conf); + StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()) + .forEach(hddsVolume -> hddsVolume.setDbParentDir(tempDir.toFile())); + container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), + scmId.toString()); + containerSet.addContainer(container); + ContainerMetrics metrics = ContainerMetrics.create(conf); + Map handlers = Maps.newHashMap(); + for (ContainerType containerType : ContainerType.values()) { + handlers.put(containerType, + Handler.getHandlerForContainerType(containerType, conf, + context.getParent().getDatanodeDetails().getUuidString(), + containerSet, volumeSet, volumeChoosingPolicy, metrics, NO_OP_ICR_SENDER)); + } + HddsDispatcher hddsDispatcher = new HddsDispatcher( + conf, containerSet, volumeSet, handlers, context, metrics, null); + hddsDispatcher.setClusterId(scmId.toString()); + /* + CAPTURE LOGS TO ASSERT THAT THE EXCEPTION WAS LOGGED PROPERLY + */ + LogCapturer logCapturer = LogCapturer.captureLogs(HddsDispatcher.LOG); + containerData.getVolume().getVolumeUsage() + .ifPresent(usage -> usage.incrementUsedSpace(50)); + usedSpace.addAndGet(50); + ContainerCommandResponseProto response = hddsDispatcher + .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null); + logCapturer.stopCapturing(); + + assertEquals(ContainerProtos.Result.SUCCESS, + response.getResult()); + verify(context, times(1)) + .addContainerActionIfAbsent(any(ContainerAction.class)); + /* + getNodeReport() SHOULD BE CALLED, AND LOG CAPTURE SHOULD CONTAIN THE EXCEPTION + */ + verify(ozoneContainer, times(1)).getNodeReport(); + assertTrue(logCapturer.getOutput().contains("Failed to create node report when handling full volume")); + } finally { + volumeSet.shutdown(); + ContainerMetrics.remove(); + } + } + @Test public void testCreateContainerWithWriteChunk() throws IOException { String testDirPath = testDir.getPath();