diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 4b586b796d05..c567ee827d11 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -141,7 +141,11 @@ public final class ScmConfigKeys { public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY = "ozone.chunk.read.buffer.default.size"; public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT = - "64KB"; + "1MB"; + public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_KEY = + "ozone.chunk.read.mapped.buffer.threshold"; + public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT = + "32KB"; public static final String OZONE_SCM_CONTAINER_LAYOUT_KEY = "ozone.scm.container.layout"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java index 7d069cddc63e..3948b5f04fc0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.util.List; +import java.util.Objects; import java.util.function.Function; import java.util.function.Supplier; @@ -58,6 +59,10 @@ static ChunkBuffer wrap(ByteBuffer buffer) { /** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer}. */ static ChunkBuffer wrap(List buffers) { + Objects.requireNonNull(buffers, "buffers == null"); + if (buffers.size() == 1) { + return wrap(buffers.get(0)); + } return new ChunkBufferImplWithByteBufferList(buffers); } @@ -91,8 +96,7 @@ default ChunkBuffer put(byte[] b) { /** Similar to {@link ByteBuffer#put(byte[])}. */ default ChunkBuffer put(byte b) { - byte[] buf = new byte[1]; - buf[0] = (byte) b; + final byte[] buf = {b}; return put(buf, 0, 1); } @@ -116,7 +120,6 @@ default ChunkBuffer put(ByteString b) { /** * Iterate the buffer from the current position to the current limit. - * * Upon the iteration complete, * the buffer's position will be equal to its limit. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java index f6a7f60b0a63..7c3a0c7d2d56 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java @@ -31,6 +31,7 @@ import java.nio.channels.GatheringByteChannel; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.function.Function; /** @@ -50,8 +51,7 @@ public class ChunkBufferImplWithByteBufferList implements ChunkBuffer { private int currentIndex; ChunkBufferImplWithByteBufferList(List buffers) { - Preconditions.checkArgument(buffers != null, "buffer == null"); - + Objects.requireNonNull(buffers, "buffers == null"); this.buffers = !buffers.isEmpty() ? ImmutableList.copyOf(buffers) : EMPTY_BUFFER; this.limit = buffers.stream().mapToInt(ByteBuffer::limit).sum(); diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 7d8f5381789d..ad5d6bdb52eb 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -829,7 +829,7 @@ ozone.chunk.read.buffer.default.size - 64KB + 1MB OZONE, SCM, CONTAINER, PERFORMANCE The default read buffer size during read chunk operations when checksum @@ -840,6 +840,14 @@ (ozone.client.bytes.per.checksum) corresponding to the chunk. + + ozone.chunk.read.mapped.buffer.threshold + 32KB + OZONE, SCM, CONTAINER, PERFORMANCE + + The default read threshold to use memory mapped buffers. + + ozone.scm.container.layout FILE_PER_BLOCK diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java index 3da43166e794..072c07be64f1 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java @@ -36,7 +36,7 @@ public class TestChunkBufferImplWithByteBufferList { @Test public void rejectsNullList() { List list = null; - assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list)); + assertThrows(NullPointerException.class, () -> ChunkBuffer.wrap(list)); } @Test diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java index dae095a19318..b1a20c9aecbc 100644 --- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java +++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java @@ -285,6 +285,17 @@ default long getTimeDuration(String name, String defaultValue, } } + default int getBufferSize(String name, String defaultValue) { + final double size = getStorageSize(name, defaultValue, StorageUnit.BYTES); + if (size <= 0) { + throw new IllegalArgumentException(name + " <= 0"); + } else if (size > Integer.MAX_VALUE) { + throw new IllegalArgumentException( + name + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE); + } + return (int) size; + } + default double getStorageSize(String name, String defaultValue, StorageUnit targetUnit) { String vString = get(name); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index 85f3e2142209..7266904139df 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -24,6 +24,7 @@ import java.io.InterruptedIOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.file.NoSuchFileException; @@ -32,8 +33,11 @@ import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; @@ -61,7 +65,7 @@ import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST; import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure; -import org.apache.ratis.util.function.CheckedConsumer; +import org.apache.ratis.util.function.CheckedFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -184,26 +188,24 @@ private static long writeDataToChannel(FileChannel channel, ChunkBuffer data, } public static ChunkBuffer readData(long len, int bufferCapacity, - CheckedConsumer readMethod) + File file, long off, HddsVolume volume, int readMappedBufferThreshold) throws StorageContainerException { + if (len > readMappedBufferThreshold) { + return readData(file, bufferCapacity, off, len, volume); + } else if (len == 0) { + return ChunkBuffer.wrap(Collections.emptyList()); + } + final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(len, bufferCapacity); - readMethod.accept(buffers); + readData(file, off, len, c -> c.position(off).read(buffers), volume); + Arrays.stream(buffers).forEach(ByteBuffer::flip); return ChunkBuffer.wrap(Arrays.asList(buffers)); } - /** - * Reads data from an existing chunk file into a list of ByteBuffers. - * - * @param file file where data lives - * @param buffers - * @param offset - * @param len - * @param volume for statistics and checker - */ - public static void readData(File file, ByteBuffer[] buffers, - long offset, long len, HddsVolume volume) - throws StorageContainerException { + private static void readData(File file, long offset, long len, + CheckedFunction readMethod, + HddsVolume volume) throws StorageContainerException { final Path path = file.toPath(); final long startTime = Time.monotonicNow(); @@ -213,8 +215,7 @@ public static void readData(File file, ByteBuffer[] buffers, bytesRead = processFileExclusively(path, () -> { try (FileChannel channel = open(path, READ_OPTIONS, NO_ATTRIBUTES); FileLock ignored = channel.lock(offset, len, true)) { - - return channel.position(offset).read(buffers); + return readMethod.apply(channel); } catch (IOException e) { onFailure(volume); throw new UncheckedIOException(e); @@ -239,10 +240,37 @@ public static void readData(File file, ByteBuffer[] buffers, bytesRead, offset, file); validateReadSize(len, bytesRead); + } - for (ByteBuffer buf : buffers) { - buf.flip(); - } + /** + * Read data from the given file using + * {@link FileChannel#map(FileChannel.MapMode, long, long)}, + * whose javadoc recommends that it is generally only worth mapping + * relatively large files (larger than a few tens of kilobytes) + * into memory from the standpoint of performance. + * + * @return a list of {@link MappedByteBuffer} containing the data. + */ + private static ChunkBuffer readData(File file, int chunkSize, + long offset, long length, HddsVolume volume) + throws StorageContainerException { + + final List buffers = new ArrayList<>( + Math.toIntExact((length - 1) / chunkSize) + 1); + readData(file, offset, length, channel -> { + long readLen = 0; + while (readLen < length) { + final int n = Math.toIntExact(Math.min(length - readLen, chunkSize)); + final ByteBuffer mapped = channel.map( + FileChannel.MapMode.READ_ONLY, offset + readLen, n); + LOG.debug("mapped: offset={}, readLen={}, n={}, {}", + offset, readLen, n, mapped.getClass()); + readLen += mapped.remaining(); + buffers.add(mapped); + } + return readLen; + }, volume); + return ChunkBuffer.wrap(buffers); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java index 449fe46ae07d..62896561f254 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.utils.db.BatchOperation; @@ -60,6 +59,7 @@ public class BlockManagerImpl implements BlockManager { // Default Read Buffer capacity when Checksum is not present private final int defaultReadBufferCapacity; + private final int readMappedBufferThreshold; /** * Constructs a Block Manager. @@ -69,19 +69,12 @@ public class BlockManagerImpl implements BlockManager { public BlockManagerImpl(ConfigurationSource conf) { Preconditions.checkNotNull(conf, "Config cannot be null"); this.config = conf; - final double size = config.getStorageSize( + this.defaultReadBufferCapacity = config.getBufferSize( ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY, - ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT, - StorageUnit.BYTES); - if (size <= 0) { - throw new IllegalArgumentException( - ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY + " <= 0"); - } else if (size > Integer.MAX_VALUE) { - throw new IllegalArgumentException( - ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY - + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE); - } - this.defaultReadBufferCapacity = (int) size; + ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT); + this.readMappedBufferThreshold = config.getBufferSize( + ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_KEY, + ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT); } @Override @@ -267,6 +260,10 @@ public int getDefaultReadBufferCapacity() { return defaultReadBufferCapacity; } + public int getReadMappedBufferThreshold() { + return readMappedBufferThreshold; + } + /** * Deletes an existing block. * As Deletion is handled by BlockDeletingService, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index ccab7f35e81f..040b03c3dce2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -60,7 +60,6 @@ import static org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage.COMMIT_DATA; import static org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure; import static org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.limitReadSize; -import static org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.readData; import static org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.validateChunkForOverwrite; import static org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.verifyChunkFileExists; @@ -75,6 +74,7 @@ public class FilePerBlockStrategy implements ChunkManager { private final boolean doSyncWrite; private final OpenFiles files = new OpenFiles(); private final int defaultReadBufferCapacity; + private final int readMappedBufferThreshold; private final VolumeSet volumeSet; public FilePerBlockStrategy(boolean sync, BlockManager manager, @@ -82,6 +82,8 @@ public FilePerBlockStrategy(boolean sync, BlockManager manager, doSyncWrite = sync; this.defaultReadBufferCapacity = manager == null ? 0 : manager.getDefaultReadBufferCapacity(); + this.readMappedBufferThreshold = manager == null ? 0 + : manager.getReadMappedBufferThreshold(); this.volumeSet = volSet; } @@ -192,8 +194,8 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, long offset = info.getOffset(); int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info, defaultReadBufferCapacity); - return readData(len, bufferCapacity, - array -> readData(chunkFile, array, offset, len, volume)); + return ChunkUtils.readData(len, bufferCapacity, chunkFile, offset, volume, + readMappedBufferThreshold); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java index 13aa9c50f774..31a340f310b8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java @@ -20,14 +20,12 @@ import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.common.ChunkBuffer; -import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; @@ -48,7 +46,6 @@ import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.util.ArrayList; @@ -69,6 +66,7 @@ public class FilePerChunkStrategy implements ChunkManager { private final boolean doSyncWrite; private final BlockManager blockManager; private final int defaultReadBufferCapacity; + private final int readMappedBufferThreshold; private final VolumeSet volumeSet; public FilePerChunkStrategy(boolean sync, BlockManager manager, @@ -77,6 +75,8 @@ public FilePerChunkStrategy(boolean sync, BlockManager manager, blockManager = manager; this.defaultReadBufferCapacity = manager == null ? 0 : manager.getDefaultReadBufferCapacity(); + this.readMappedBufferThreshold = manager == null ? 0 + : manager.getReadMappedBufferThreshold(); this.volumeSet = volSet; } @@ -233,9 +233,6 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info, defaultReadBufferCapacity); - ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len, - bufferCapacity); - long chunkFileOffset = 0; if (info.getOffset() != 0) { try { @@ -267,8 +264,8 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, if (file.exists()) { long offset = info.getOffset() - chunkFileOffset; Preconditions.checkState(offset >= 0); - ChunkUtils.readData(file, dataBuffers, offset, len, volume); - return ChunkBuffer.wrap(Lists.newArrayList(dataBuffers)); + return ChunkUtils.readData(len, bufferCapacity, file, offset, volume, + readMappedBufferThreshold); } } catch (StorageContainerException ex) { //UNABLE TO FIND chunk is not a problem as we will try with the @@ -276,7 +273,6 @@ public ChunkBuffer readChunk(Container container, BlockID blockID, if (ex.getResult() != UNABLE_TO_FIND_CHUNK) { throw ex; } - BufferUtils.clearBuffers(dataBuffers); } } throw new StorageContainerException( diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java index aa7285a232cc..02b7e93d50f4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java @@ -93,6 +93,9 @@ long getCommittedBlockLength(Container container, BlockID blockID) int getDefaultReadBufferCapacity(); + /** @return the threshold to read using memory mapped buffers. */ + int getReadMappedBufferThreshold(); + /** * Shutdown ContainerManager. */ diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java index 554265688d1e..037de863c003 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java @@ -17,9 +17,12 @@ */ package org.apache.hadoop.ozone.container.keyvalue.helpers; +import java.io.BufferedOutputStream; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; @@ -27,6 +30,7 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -36,7 +40,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.common.ChunkBuffer; -import org.apache.hadoop.ozone.common.utils.BufferUtils; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.ozone.test.GenericTestUtils; @@ -64,6 +67,16 @@ public class TestChunkUtils { LoggerFactory.getLogger(TestChunkUtils.class); private static final String PREFIX = TestChunkUtils.class.getSimpleName(); + private static final int BUFFER_CAPACITY = 1 << 20; + private static final int MAPPED_BUFFER_THRESHOLD = 32 << 10; + private static final Random RANDOM = new Random(); + + static ChunkBuffer readData(File file, long off, long len) + throws StorageContainerException { + LOG.info("off={}, len={}", off, len); + return ChunkUtils.readData(len, BUFFER_CAPACITY, file, off, null, + MAPPED_BUFFER_THRESHOLD); + } @Test public void concurrentReadOfSameFile() throws Exception { @@ -85,12 +98,11 @@ public void concurrentReadOfSameFile() throws Exception { final int threadNumber = i; executor.execute(() -> { try { - ByteBuffer[] readBuffers = BufferUtils.assignByteBuffers(len, len); - ChunkUtils.readData(file, readBuffers, offset, len, null); - + final ChunkBuffer chunk = readData(file, offset, len); // There should be only one element in readBuffers - Assertions.assertEquals(1, readBuffers.length); - ByteBuffer readBuffer = readBuffers[0]; + final List buffers = chunk.asByteBufferList(); + Assertions.assertEquals(1, buffers.size()); + final ByteBuffer readBuffer = buffers.get(0); LOG.info("Read data ({}): {}", threadNumber, new String(readBuffer.array(), UTF_8)); @@ -172,12 +184,11 @@ public void serialRead() throws Exception { int offset = 0; ChunkUtils.writeData(file, data, offset, len, null, true); - ByteBuffer[] readBuffers = BufferUtils.assignByteBuffers(len, len); - ChunkUtils.readData(file, readBuffers, offset, len, null); - + final ChunkBuffer chunk = readData(file, offset, len); // There should be only one element in readBuffers - Assertions.assertEquals(1, readBuffers.length); - ByteBuffer readBuffer = readBuffers[0]; + final List buffers = chunk.asByteBufferList(); + Assertions.assertEquals(1, buffers.size()); + final ByteBuffer readBuffer = buffers.get(0); assertArrayEquals(array, readBuffer.array()); assertEquals(len, readBuffer.remaining()); @@ -220,15 +231,90 @@ public void readMissingFile() { int len = 123; int offset = 0; File nonExistentFile = new File("nosuchfile"); - ByteBuffer[] bufs = BufferUtils.assignByteBuffers(len, len); // when StorageContainerException e = assertThrows( StorageContainerException.class, - () -> ChunkUtils.readData(nonExistentFile, bufs, offset, len, null)); + () -> readData(nonExistentFile, offset, len)); // then Assertions.assertEquals(UNABLE_TO_FIND_CHUNK, e.getResult()); } + @Test + public void testReadData() throws Exception { + final File dir = GenericTestUtils.getTestDir("testReadData"); + try { + Assertions.assertTrue(dir.mkdirs()); + + // large file + final int large = 10 << 20; // 10MB + Assertions.assertTrue(large > MAPPED_BUFFER_THRESHOLD); + runTestReadFile(large, dir, true); + + // small file + final int small = 30 << 10; // 30KB + Assertions.assertTrue(small <= MAPPED_BUFFER_THRESHOLD); + runTestReadFile(small, dir, false); + + // boundary case + runTestReadFile(MAPPED_BUFFER_THRESHOLD, dir, false); + + // empty file + runTestReadFile(0, dir, false); + + for (int i = 0; i < 10; i++) { + final int length = RANDOM.nextInt(2 * MAPPED_BUFFER_THRESHOLD) + 1; + runTestReadFile(length, dir, length > MAPPED_BUFFER_THRESHOLD); + } + } finally { + FileUtils.deleteDirectory(dir); + } + } + + void runTestReadFile(int length, File dir, boolean isMapped) + throws Exception { + final File file; + for (int i = length; ; i++) { + final File f = new File(dir, "file_" + i); + if (!f.exists()) { + file = f; + break; + } + } + LOG.info("file: {}", file); + + // write a file + final byte[] array = new byte[BUFFER_CAPACITY]; + final long seed = System.nanoTime(); + LOG.info("seed: {}", seed); + RANDOM.setSeed(seed); + try (OutputStream out = new BufferedOutputStream(Files.newOutputStream( + file.toPath(), StandardOpenOption.CREATE_NEW))) { + for (int written = 0; written < length;) { + RANDOM.nextBytes(array); + final int remaining = length - written; + final int toWrite = Math.min(remaining, array.length); + out.write(array, 0, toWrite); + written += toWrite; + } + } + Assertions.assertEquals(length, file.length()); + + // read the file back + final ChunkBuffer chunk = readData(file, 0, length); + Assertions.assertEquals(length, chunk.remaining()); + + final List buffers = chunk.asByteBufferList(); + LOG.info("buffers.size(): {}", buffers.size()); + Assertions.assertEquals((length - 1) / BUFFER_CAPACITY + 1, buffers.size()); + LOG.info("buffer class: {}", buffers.get(0).getClass()); + + RANDOM.setSeed(seed); + for (ByteBuffer b : buffers) { + Assertions.assertEquals(isMapped, b instanceof MappedByteBuffer); + RANDOM.nextBytes(array); + Assertions.assertEquals(ByteBuffer.wrap(array, 0, b.remaining()), b); + } + } }