diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java index e5dec0846b48..c85f7cd9066c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_CHECKSUM_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_OPEN; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DISK_OUT_OF_SPACE; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_ALGORITHM; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_FIND_DATA_DIR; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getContainerCommandResponse; @@ -44,6 +45,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; @@ -55,6 +57,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -341,4 +344,16 @@ public static long getPendingDeletionBlocks(ContainerData containerData) { " not support."); } } + + public static void assertSpaceAvailability(long containerId, HddsVolume volume, int sizeRequested) + throws StorageContainerException { + final SpaceUsageSource currentUsage = volume.getCurrentUsage(); + final long spared = volume.getFreeSpaceToSpare(currentUsage.getCapacity()); + + if (currentUsage.getAvailable() - spared < sizeRequested) { + throw new StorageContainerException("Failed to write " + sizeRequested + " bytes to container " + + containerId + " due to volume " + volume.getStorageID() + " out of space " + + currentUsage + ", minimum free space spared=" + spared, DISK_OUT_OF_SPACE); + } + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java index fe4bbd1478c2..3218fb4f88d9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/KeyValueStreamDataChannel.java @@ -59,6 +59,7 @@ public int write(ReferenceCountedObject referenceCounted) throws IOException { getMetrics().incContainerOpsMetrics(getType()); assertOpen(); + assertSpaceAvailability(referenceCounted.get().remaining()); return writeBuffers(referenceCounted, buffers, this::writeFileChannel); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java index b1adfbcac5d2..43bcea5e9bd9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/StreamDataChannelBase.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.util.Time; import org.apache.ratis.statemachine.StateMachine; @@ -93,6 +94,10 @@ public final boolean isOpen() { return getChannel().isOpen(); } + protected void assertSpaceAvailability(int requested) throws StorageContainerException { + ContainerUtils.assertSpaceAvailability(containerData.getContainerID(), containerData.getVolume(), requested); + } + public void setLinked() { linked.set(true); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java index e4c7decefd16..8e87878fcf60 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/TestKeyValueStreamDataChannel.java @@ -24,8 +24,12 @@ import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeFully; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; @@ -38,6 +42,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.fs.SpaceUsageSource; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID; @@ -48,6 +53,9 @@ import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.ClientVersion; +import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod; import org.apache.ratis.client.api.DataStreamOutput; import org.apache.ratis.io.FilePositionCount; @@ -151,6 +159,29 @@ private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b) return request; } + @Test + public void testVolumeFullCase() throws Exception { + File tempFile = File.createTempFile("test-kv-stream", ".tmp"); + tempFile.deleteOnExit(); + HddsVolume mockVolume = mock(HddsVolume.class); + when(mockVolume.getStorageID()).thenReturn("storageId"); + when(mockVolume.isVolumeFull()).thenReturn(true); + when(mockVolume.getCurrentUsage()).thenReturn(new SpaceUsageSource.Fixed(100L, 0L, 100L)); + ContainerData mockContainerData = mock(ContainerData.class); + when(mockContainerData.getContainerID()).thenReturn(123L); + when(mockContainerData.getVolume()).thenReturn(mockVolume); + ContainerMetrics mockMetrics = mock(ContainerMetrics.class); + KeyValueStreamDataChannel writeChannel = new KeyValueStreamDataChannel(tempFile, mockContainerData, mockMetrics); + assertThrows(StorageContainerException.class, + () -> writeChannel.assertSpaceAvailability(1)); + final ByteBuffer putBlockBuf = ContainerCommandRequestMessage.toMessage( + PUT_BLOCK_PROTO, null).getContent().asReadOnlyByteBuffer(); + ReferenceCountedObject wrap = ReferenceCountedObject.wrap(putBlockBuf); + wrap.retain(); + assertThrows(StorageContainerException.class, () -> writeChannel.write(wrap)); + wrap.release(); + } + @Test public void testBuffers() throws Exception { final ExecutorService executor = Executors.newFixedThreadPool(32);