Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_CHECKSUM_ERROR;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_OPEN;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DISK_OUT_OF_SPACE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_ALGORITHM;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_FIND_DATA_DIR;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getContainerCommandResponse;
Expand All @@ -44,6 +45,7 @@
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
Expand All @@ -55,6 +57,7 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -341,4 +344,16 @@ public static long getPendingDeletionBlocks(ContainerData containerData) {
" not support.");
}
}

public static void assertSpaceAvailability(long containerId, HddsVolume volume, int sizeRequested)
throws StorageContainerException {
final SpaceUsageSource currentUsage = volume.getCurrentUsage();
final long spared = volume.getFreeSpaceToSpare(currentUsage.getCapacity());

if (currentUsage.getAvailable() - spared < sizeRequested) {
throw new StorageContainerException("Failed to write " + sizeRequested + " bytes to container "
+ containerId + " due to volume " + volume.getStorageID() + " out of space "
+ currentUsage + ", minimum free space spared=" + spared, DISK_OUT_OF_SPACE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public int write(ReferenceCountedObject<ByteBuffer> referenceCounted)
throws IOException {
getMetrics().incContainerOpsMetrics(getType());
assertOpen();
assertSpaceAvailability(referenceCounted.get().remaining());

return writeBuffers(referenceCounted, buffers, this::writeFileChannel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.util.Time;
import org.apache.ratis.statemachine.StateMachine;
Expand Down Expand Up @@ -93,6 +94,10 @@ public final boolean isOpen() {
return getChannel().isOpen();
}

protected void assertSpaceAvailability(int requested) throws StorageContainerException {
ContainerUtils.assertSpaceAvailability(containerData.getContainerID(), containerData.getVolume(), requested);
}

public void setLinked() {
linked.set(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@
import static org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.writeFully;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
Expand All @@ -38,6 +42,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
Expand All @@ -48,6 +53,9 @@
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel.WriteMethod;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.io.FilePositionCount;
Expand Down Expand Up @@ -151,6 +159,29 @@ private static ContainerCommandRequestProto readPutBlockRequest(ByteBuffer b)
return request;
}

@Test
public void testVolumeFullCase() throws Exception {
File tempFile = File.createTempFile("test-kv-stream", ".tmp");
tempFile.deleteOnExit();
HddsVolume mockVolume = mock(HddsVolume.class);
when(mockVolume.getStorageID()).thenReturn("storageId");
when(mockVolume.isVolumeFull()).thenReturn(true);
when(mockVolume.getCurrentUsage()).thenReturn(new SpaceUsageSource.Fixed(100L, 0L, 100L));
ContainerData mockContainerData = mock(ContainerData.class);
when(mockContainerData.getContainerID()).thenReturn(123L);
when(mockContainerData.getVolume()).thenReturn(mockVolume);
ContainerMetrics mockMetrics = mock(ContainerMetrics.class);
KeyValueStreamDataChannel writeChannel = new KeyValueStreamDataChannel(tempFile, mockContainerData, mockMetrics);
assertThrows(StorageContainerException.class,
() -> writeChannel.assertSpaceAvailability(1));
final ByteBuffer putBlockBuf = ContainerCommandRequestMessage.toMessage(
PUT_BLOCK_PROTO, null).getContent().asReadOnlyByteBuffer();
ReferenceCountedObject<ByteBuffer> wrap = ReferenceCountedObject.wrap(putBlockBuf);
wrap.retain();
assertThrows(StorageContainerException.class, () -> writeChannel.write(wrap));
wrap.release();
}

@Test
public void testBuffers() throws Exception {
final ExecutorService executor = Executors.newFixedThreadPool(32);
Expand Down