From 4e21a60f2c6b24d4596a73446a99018d280f25a4 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Fri, 10 Jun 2022 15:51:54 -0500 Subject: [PATCH 1/6] HADOOP-18106: Handle memory fragmentation in S3A Vectored IO implementation. part of HADOOP-18103. Handling memoroy fragmentation in S3A vectored IO implementation by allocating smaller user range requested size buffers and directly filling them from the remote S3 stream and skipping undesired data in between ranges. This patch also adds aborting active vectored reads when stream is closed or unbuffer is called. --- .../apache/hadoop/fs/ChecksumFileSystem.java | 3 +- .../apache/hadoop/fs/PositionedReadable.java | 3 +- .../apache/hadoop/fs/RawLocalFileSystem.java | 14 +- .../hadoop/fs/impl/VectoredReadUtils.java | 79 ++++---- .../hadoop/fs/TestChecksumFileSystem.java | 3 +- .../AbstractContractVectoredReadTest.java | 86 ++++++-- .../TestLocalFSContractVectoredRead.java | 52 +++++ .../hadoop/fs/impl/TestVectoredReadUtils.java | 38 ++-- .../apache/hadoop/fs/s3a/S3AInputStream.java | 185 ++++++++++++------ .../s3a/ITestS3AContractVectoredRead.java | 51 +++++ .../s3a/scale/AbstractSTestS3AHugeFiles.java | 8 +- 11 files changed, 385 insertions(+), 137 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index a6bdc220ba201..d706226b51f8e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -55,6 +55,7 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortRanges; /**************************************************************** * Abstract Checksumed FileSystem. @@ -408,7 +409,7 @@ public void readVectored(List ranges, int minSeek = minSeekForVectorReads(); int maxSize = maxReadSizeForVectorReads(); List dataRanges = - VectoredReadUtils.sortAndMergeRanges(ranges, bytesPerSum, + VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum, minSeek, maxReadSizeForVectorReads()); List checksumRanges = findChecksumRanges(dataRanges, bytesPerSum, minSeek, maxSize); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index 7e543ebf22669..26d7b3e90904e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -121,7 +121,6 @@ default int maxReadSizeForVectorReads() { */ default void readVectored(List ranges, IntFunction allocate) throws IOException { - VectoredReadUtils.readVectored(this, ranges, allocate, minSeekForVectorReads(), - maxReadSizeForVectorReads()); + VectoredReadUtils.readVectored(this, ranges, allocate); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index 208d1668b620c..eece36310e131 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -68,6 +68,7 @@ import org.apache.hadoop.util.StringUtils; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortRanges; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS; @@ -303,23 +304,24 @@ AsynchronousFileChannel getAsyncChannel() throws IOException { public void readVectored(List ranges, IntFunction allocate) throws IOException { + List sortedRanges = Arrays.asList(sortRanges(ranges)); // Set up all of the futures, so that we can use them if things fail - for(FileRange range: ranges) { + for(FileRange range: sortedRanges) { VectoredReadUtils.validateRangeRequest(range); range.setData(new CompletableFuture<>()); } try { AsynchronousFileChannel channel = getAsyncChannel(); - ByteBuffer[] buffers = new ByteBuffer[ranges.size()]; - AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers); - for(int i = 0; i < ranges.size(); ++i) { - FileRange range = ranges.get(i); + ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()]; + AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers); + for(int i = 0; i < sortedRanges.size(); ++i) { + FileRange range = sortedRanges.get(i); buffers[i] = allocate.apply(range.getLength()); channel.read(buffers[i], range.getOffset(), i, asyncHandler); } } catch (IOException ioe) { LOG.debug("Exception occurred during vectored read ", ioe); - for(FileRange range: ranges) { + for(FileRange range: sortedRanges) { range.getData().completeExceptionally(ioe); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java index 9a16e6841ddf5..c3efd2e4695dd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java @@ -68,35 +68,19 @@ public static void validateVectoredReadRanges(List ranges) /** - * Read fully a list of file ranges asynchronously from this file. - * The default iterates through the ranges to read each synchronously, but - * the intent is that subclasses can make more efficient readers. + * This is the default implementation which iterates through the ranges + * to read each synchronously, but the intent is that subclasses + * can make more efficient readers. * The data or exceptions are pushed into {@link FileRange#getData()}. * @param stream the stream to read the data from * @param ranges the byte ranges to read * @param allocate the byte buffer allocation - * @param minimumSeek the minimum number of bytes to seek over - * @param maximumRead the largest number of bytes to combine into a single read */ public static void readVectored(PositionedReadable stream, List ranges, - IntFunction allocate, - int minimumSeek, - int maximumRead) { - if (isOrderedDisjoint(ranges, 1, minimumSeek)) { - for(FileRange range: ranges) { - range.setData(readRangeFrom(stream, range, allocate)); - } - } else { - for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek, - maximumRead)) { - CompletableFuture read = - readRangeFrom(stream, range, allocate); - for(FileRange child: range.getUnderlying()) { - child.setData(read.thenApply( - (b) -> sliceTo(b, range.getOffset(), child))); - } - } + IntFunction allocate) { + for(FileRange range: ranges) { + range.setData(readRangeFrom(stream, range, allocate)); } } @@ -209,7 +193,38 @@ public static long roundUp(long offset, int chunkSize) { } /** - * Sort and merge ranges to optimize the access from the underlying file + * Check if the input ranges are overlapping in nature. + * We call two ranges to be overlapping when start offset + * of second is less than the end offset of first. + * End offset is calculated as start offset + length. + * @param input list if input ranges. + * @return true/false based on logic explained above. + */ + public static List validateNonOverlappingAndReturnSortedRanges( + List input) { + + if (input.size() <= 1) { + return input; + } + FileRange[] sortedRanges = sortRanges(input); + FileRange prev = sortedRanges[0]; + for(int i=1; i input) { + // sort the ranges by offset + FileRange[] sortedRanges = input.toArray(new FileRange[0]); + Arrays.sort(sortedRanges, Comparator.comparingLong(FileRange::getOffset)); + return sortedRanges; + } + + /** + * Merge sorted ranges to optimize the access from the underlying file * system. * The motivations are that: *
    @@ -219,24 +234,22 @@ public static long roundUp(long offset, int chunkSize) { *
  • Some file systems want to round ranges to be at checksum boundaries.
  • *
* - * @param input the list of input ranges + * @param sortedRanges already sorted list of ranges based on offset. * @param chunkSize round the start and end points to multiples of chunkSize * @param minimumSeek the smallest gap that we should seek over in bytes * @param maxSize the largest combined file range in bytes * @return the list of sorted CombinedFileRanges that cover the input */ - public static List sortAndMergeRanges(List input, - int chunkSize, - int minimumSeek, - int maxSize) { - // sort the ranges by offset - FileRange[] ranges = input.toArray(new FileRange[0]); - Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset)); + public static List mergeSortedRanges(List sortedRanges, + int chunkSize, + int minimumSeek, + int maxSize) { + CombinedFileRange current = null; - List result = new ArrayList<>(ranges.length); + List result = new ArrayList<>(sortedRanges.size()); // now merge together the ones that merge - for(FileRange range: ranges) { + for(FileRange range: sortedRanges) { long start = roundDown(range.getOffset(), chunkSize); long end = roundUp(range.getOffset() + range.getLength(), chunkSize); if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java index 4d61154490838..c912f11993cb0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java @@ -19,9 +19,8 @@ package org.apache.hadoop.fs; import java.util.Arrays; +import java.util.concurrent.CompletableFuture; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.permission.FsPermission; import static org.apache.hadoop.fs.FileSystemTestHelper.*; import org.apache.hadoop.conf.Configuration; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index e8c86b5dbbc4d..af90d62247be4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -42,6 +42,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.impl.FutureIOSupport; +import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; +import org.apache.hadoop.test.LambdaTestUtils; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; @@ -53,10 +55,12 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class); public static final int DATASET_LEN = 64 * 1024; - private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; - private final IntFunction allocate; + protected final IntFunction allocate; + + private WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); private final String bufferType; @@ -67,8 +71,10 @@ public static List params() { public AbstractContractVectoredReadTest(String bufferType) { this.bufferType = bufferType; - this.allocate = "array".equals(bufferType) ? - ByteBuffer::allocate : ByteBuffer::allocateDirect; + this.allocate = value -> { + boolean isDirect = !"array".equals(bufferType); + return pool.getBuffer(isDirect, value); + }; } @Override @@ -79,6 +85,12 @@ public void setup() throws Exception { createFile(fs, path, true, DATASET); } + @Override + public void teardown() throws Exception { + super.teardown(); + pool.release(); + } + @Test public void testVectoredReadMultipleRanges() throws Exception { FileSystem fs = getFileSystem(); @@ -170,30 +182,74 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception { } } + @Test + public void testOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = getSampleOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + } + } + @Test public void testSameRanges() throws Exception { + // Same ranges are special case of overlapping only. FileSystem fs = getFileSystem(); + List fileRanges = getSampleSameRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + } + } + + protected List getSampleSameRanges() { List fileRanges = new ArrayList<>(); fileRanges.add(new FileRangeImpl(8*1024, 1000)); fileRanges.add(new FileRangeImpl(8*1024, 1000)); fileRanges.add(new FileRangeImpl(8*1024, 1000)); + return fileRanges; + } + + protected List getSampleOverlappingRanges() { + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(100, 500)); + fileRanges.add(new FileRangeImpl(400, 500)); + return fileRanges; + } + protected void validateUnsupportedOperation(FileSystem fs, + List fileRanges) + throws Exception { CompletableFuture builder = fs.openFile(path(VECTORED_READ_FILE_NAME)) .build(); try (FSDataInputStream in = builder.get()) { + LambdaTestUtils.intercept(UnsupportedOperationException.class, + () -> in.readVectored(fileRanges, allocate)); + } + } + + @Test + public void testSomeRandomNonOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(500, 100)); + fileRanges.add(new FileRangeImpl(1000, 200)); + fileRanges.add(new FileRangeImpl(50, 10)); + fileRanges.add(new FileRangeImpl(10, 5)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); } } @Test - public void testOverlappingRanges() throws Exception { + public void testConsecutiveRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 1000)); - fileRanges.add(new FileRangeImpl(90, 900)); - fileRanges.add(new FileRangeImpl(50, 900)); - fileRanges.add(new FileRangeImpl(10, 980)); + fileRanges.add(new FileRangeImpl(500, 100)); + fileRanges.add(new FileRangeImpl(600, 200)); + fileRanges.add(new FileRangeImpl(800, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); @@ -242,7 +298,7 @@ public void testNegativeOffsetRange() throws Exception { @Test public void testNormalReadAfterVectoredRead() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges = createSomeOverlappingRanges(); + List fileRanges = createSomeRandomRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); // read starting 200 bytes @@ -260,7 +316,7 @@ public void testNormalReadAfterVectoredRead() throws Exception { @Test public void testVectoredReadAfterNormalRead() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges = createSomeOverlappingRanges(); + List fileRanges = createSomeRandomRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { // read starting 200 bytes byte[] res = new byte[200]; @@ -278,8 +334,8 @@ public void testVectoredReadAfterNormalRead() throws Exception { @Test public void testMultipleVectoredReads() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges1 = createSomeOverlappingRanges(); - List fileRanges2 = createSomeOverlappingRanges(); + List fileRanges1 = createSomeRandomRanges(); + List fileRanges2 = createSomeRandomRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges1, allocate); in.readVectored(fileRanges2, allocate); @@ -288,10 +344,10 @@ public void testMultipleVectoredReads() throws Exception { } } - protected List createSomeOverlappingRanges() { + protected List createSomeRandomRanges() { List fileRanges = new ArrayList<>(); fileRanges.add(new FileRangeImpl(0, 100)); - fileRanges.add(new FileRangeImpl(90, 50)); + fileRanges.add(new FileRangeImpl(110, 50)); return fileRanges; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 099e3b946d18a..3ac7ca694af62 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -18,9 +18,27 @@ package org.apache.hadoop.fs.contract.localfs; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest { @@ -32,4 +50,38 @@ public TestLocalFSContractVectoredRead(String bufferType) { protected AbstractFSContract createContract(Configuration conf) { return new LocalFSContract(conf); } + + @Test + public void testChecksumValidationDuringVectoredRead() throws Exception { + Path testPath = path("big_range_checksum"); + LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); + byte[] DATASET_CORRECT = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + try (FSDataOutputStream out = localFs.create(testPath, true)){ + out.write(DATASET_CORRECT); + } + Path checksumPath = localFs.getChecksumFile(testPath); + Assertions.assertThat(localFs.exists(checksumPath)) + .describedAs("Checksum file should be present") + .isTrue(); + CompletableFuture fis = localFs.openFile(testPath).build(); + List someRandomRanges = new ArrayList<>(); + someRandomRanges.add(new FileRangeImpl(10, 1024)); + someRandomRanges.add(new FileRangeImpl(1025, 1024)); + try (FSDataInputStream in = fis.get()){ + in.readVectored(someRandomRanges, allocate); + validateVectoredReadResult(someRandomRanges, DATASET_CORRECT); + } + byte[] DATASET_CORRUPTED = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); + try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){ + out.write(DATASET_CORRUPTED); + } + CompletableFuture fisN = localFs.openFile(testPath).build(); + try (FSDataInputStream in = fisN.get()){ + in.readVectored(someRandomRanges, allocate); + // Expect checksum exception when data is updated directly through + // raw local fs instance. + intercept(ChecksumException.class, + () -> validateVectoredReadResult(someRandomRanges, DATASET_CORRUPTED)); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java index cfd366701be6b..4bf842c70dc9e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.test.HadoopTestBase; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortRanges; import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully; import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally; @@ -141,8 +142,8 @@ public void testSortAndMerge() { new FileRangeImpl(1000, 100) ); assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - List outputList = VectoredReadUtils.sortAndMergeRanges( - input, 100, 1001, 2500); + List outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 100, 1001, 2500); assertEquals("merged range size", 1, outputList.size()); CombinedFileRange output = outputList.get(0); assertEquals("merged range underlying size", 3, output.getUnderlying().size()); @@ -152,7 +153,7 @@ public void testSortAndMerge() { // the minSeek doesn't allow the first two to merge assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); - outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1000, 2100); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), 100, 1000, 2100); assertEquals("merged range size", 2, outputList.size()); assertEquals("range[1000,1100)", outputList.get(0).toString()); assertEquals("range[2100,3100)", outputList.get(1).toString()); @@ -161,7 +162,7 @@ public void testSortAndMerge() { // the maxSize doesn't allow the third range to merge assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1001, 2099); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), 100, 1001, 2099); assertEquals("merged range size", 2, outputList.size()); assertEquals("range[1000,2200)", outputList.get(0).toString()); assertEquals("range[3000,3100)", outputList.get(1).toString()); @@ -170,7 +171,7 @@ public void testSortAndMerge() { // test the round up and round down (the maxSize doesn't allow any merges) assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); - outputList = VectoredReadUtils.sortAndMergeRanges(input, 16, 1001, 100); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), 16, 1001, 100); assertEquals("merged range size", 3, outputList.size()); assertEquals("range[992,1104)", outputList.get(0).toString()); assertEquals("range[2096,2208)", outputList.get(1).toString()); @@ -188,8 +189,8 @@ public void testSortAndMergeMoreCases() throws Exception { new FileRangeImpl(1000, 100) ); assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - List outputList = VectoredReadUtils.sortAndMergeRanges( - input, 1, 1001, 2500); + List outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 1, 1001, 2500); assertEquals("merged range size", 1, outputList.size()); CombinedFileRange output = outputList.get(0); assertEquals("merged range underlying size", 4, output.getUnderlying().size()); @@ -197,8 +198,8 @@ public void testSortAndMergeMoreCases() throws Exception { assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); - outputList = VectoredReadUtils.sortAndMergeRanges( - input, 100, 1001, 2500); + outputList = VectoredReadUtils.mergeSortedRanges( + Arrays.asList(sortRanges(input)), 100, 1001, 2500); assertEquals("merged range size", 1, outputList.size()); output = outputList.get(0); assertEquals("merged range underlying size", 4, output.getUnderlying().size()); @@ -225,7 +226,7 @@ private void assertEqualRangeCountsAfterMerging(List inputRanges, int minimumSeek, int maxSize) { List combinedFileRanges = VectoredReadUtils - .sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize); + .mergeSortedRanges(inputRanges, chunkSize, minimumSeek, maxSize); Assertions.assertThat(combinedFileRanges) .describedAs("Mismatch in number of ranges post merging") .hasSize(inputRanges.size()); @@ -338,7 +339,7 @@ public void testReadVectored() throws Exception { }).when(stream).readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); // should not merge the ranges - VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100, 100); + VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate); Mockito.verify(stream, Mockito.times(3)) .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); for(int b=0; b < input.size(); ++b) { @@ -346,6 +347,11 @@ public void testReadVectored() throws Exception { } } + /** + * TODO: Honestly this test doesn't makes sense much now as it is similar to above. + * Took time to fix this though. If you guys approve, I will remove. + * @throws Exception + */ @Test public void testReadVectoredMerge() throws Exception { List input = Arrays.asList(new FileRangeImpl(2000, 100), @@ -357,12 +363,14 @@ public void testReadVectoredMerge() throws Exception { return null; }).when(stream).readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); - // should merge the ranges into a single read - VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 1000, 2100); - Mockito.verify(stream, Mockito.times(1)) + // Default vectored read implementation doesn't do merging thus + // number of invocation should be equal to number of ranges. + VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate); + Mockito.verify(stream, Mockito.times(input.size())) .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); for(int b=0; b < input.size(); ++b) { - validateBuffer("buffer " + b, input.get(b).getData().get(), (2 - b) * 1000); + // setting the start to zero as buffer will be filled separately for each range. + validateBuffer("buffer " + b, input.get(b).getData().get(), 0); } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 9d87f26c3c056..d261dba5aa7cb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -22,11 +22,13 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.IntFunction; import com.amazonaws.services.s3.model.GetObjectRequest; @@ -46,7 +48,6 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.impl.CombinedFileRange; -import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.fs.impl.VectoredReadUtils; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; @@ -60,8 +61,8 @@ import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.impl.VectoredReadUtils.isOrderedDisjoint; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.sliceTo; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortAndMergeRanges; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.mergeSortedRanges; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; import static org.apache.hadoop.util.StringUtils.toLowerCase; @@ -107,6 +108,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ private static final int TMP_BUFFER_MAX_SIZE = 64 * 1024; + /** + * Atomic boolean variable to stop all ongoing vectored read operation + * for this input stream. This will be set to true stream is closed or + * unbuffer is called. + */ + private final AtomicBoolean stopVectoredIOOperations = new AtomicBoolean(false); + /** * This is the public position; the one set in {@link #seek(long)} * and returned in {@link #getPos()}. @@ -589,6 +597,7 @@ public synchronized void close() throws IOException { if (!closed) { closed = true; try { + stopVectoredIOOperations.set(true); // close or abort the stream; blocking awaitFuture(closeStream("close() operation", false, true)); LOG.debug("Statistics of stream {}\n{}", key, streamStatistics); @@ -940,31 +949,32 @@ public void readVectored(List ranges, LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); checkNotClosed(); + if (stopVectoredIOOperations.getAndSet(false)) { + LOG.debug("Reinstating vectored read operation for path {} ", pathStr); + } + List sortedRanges = validateNonOverlappingAndReturnSortedRanges(ranges); for (FileRange range : ranges) { validateRangeRequest(range); CompletableFuture result = new CompletableFuture<>(); range.setData(result); } - if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) { + if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) { LOG.debug("Not merging the ranges as they are disjoint"); - for(FileRange range: ranges) { + for(FileRange range: sortedRanges) { ByteBuffer buffer = allocate.apply(range.getLength()); unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); } } else { LOG.debug("Trying to merge the ranges as they are not disjoint"); - List combinedFileRanges = sortAndMergeRanges(ranges, + List combinedFileRanges = mergeSortedRanges(sortedRanges, 1, minSeekForVectorReads(), maxReadSizeForVectorReads()); LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", ranges.size(), combinedFileRanges.size()); for(CombinedFileRange combinedFileRange: combinedFileRanges) { - CompletableFuture result = new CompletableFuture<>(); - ByteBuffer buffer = allocate.apply(combinedFileRange.getLength()); - combinedFileRange.setData(result); unboundedThreadPool.submit( - () -> readCombinedRangeAndUpdateChildren(combinedFileRange, buffer)); + () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate)); } } LOG.debug("Finished submitting vectored read to threadpool" + @@ -972,58 +982,100 @@ public void readVectored(List ranges, } /** - * Read data in the combinedFileRange and update data in buffers - * of all underlying ranges. - * @param combinedFileRange combined range. - * @param buffer combined buffer. + * Read the data from S3 for the bigger combined file range and update all the + * underlying ranges. + * @param combinedFileRange big combined file range. + * @param allocate method to create byte buffers to hold result data. */ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, - ByteBuffer buffer) { - // Not putting read single range call inside try block as - // exception if any occurred during this call will be raised - // during awaitFuture call while getting the combined buffer. - readSingleRange(combinedFileRange, buffer); + IntFunction allocate) { + LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr); + S3Object objectRange = null; + S3ObjectInputStream objectContent = null; try { - // In case of single range we return the original byte buffer else - // we return slice byte buffers for each child ranges. - ByteBuffer combinedBuffer = FutureIOSupport.awaitFuture(combinedFileRange.getData()); - if (combinedFileRange.getUnderlying().size() == 1) { - combinedFileRange.getUnderlying().get(0).getData().complete(combinedBuffer); - } else { - for (FileRange child : combinedFileRange.getUnderlying()) { - updateOriginalRange(child, combinedBuffer, combinedFileRange); - } + checkIfVectoredIOStopped(); + final String operationName = "readCombinedFileRange"; + objectRange = getS3Object(operationName, + combinedFileRange.getOffset(), + combinedFileRange.getLength()); + objectContent = objectRange.getObjectContent(); + if (objectContent == null) { + throw new PathIOException(uri, + "Null IO stream received during " + operationName); } + populateChildBuffers(combinedFileRange, objectContent, allocate); } catch (Exception ex) { - LOG.warn("Exception occurred while reading combined range from file {}", pathStr, ex); + LOG.warn("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex); for(FileRange child : combinedFileRange.getUnderlying()) { child.getData().completeExceptionally(ex); } + } finally { + IOUtils.cleanupWithLogger(LOG, objectRange, objectContent); + } + LOG.debug("Finished reading range {} from path {} ", combinedFileRange, pathStr); + } + + /** + * Populate underlying buffers of the child ranges. + * @param combinedFileRange big combined file range. + * @param objectContent data from s3. + * @param allocate method to allocate child byte buffers. + * @throws IOException any IOE. + */ + private void populateChildBuffers(CombinedFileRange combinedFileRange, + S3ObjectInputStream objectContent, + IntFunction allocate) throws IOException { + // If the combined file range just contains a single child + // range, we only have to fill that one child buffer else + // we drain the intermediate data between consecutive ranges + // and fill the buffers one by one. + if (combinedFileRange.getUnderlying().size() == 1) { + FileRange child = combinedFileRange.getUnderlying().get(0); + ByteBuffer buffer = allocate.apply(child.getLength()); + populateBuffer(child.getLength(), buffer, objectContent); + child.getData().complete(buffer); + } else { + FileRange prev = null; + for (FileRange child : combinedFileRange.getUnderlying()) { + if (prev != null && prev.getOffset() + prev.getLength() < child.getOffset()) { + long drainQuantity = child.getOffset() - prev.getOffset() - prev.getLength(); + drainUnnecessaryData(objectContent, drainQuantity); + } + ByteBuffer buffer = allocate.apply(child.getLength()); + populateBuffer(child.getLength(), buffer, objectContent); + child.getData().complete(buffer); + prev = child; + } } } /** - * Update data in child range from combined range. - * @param child child range. - * @param combinedBuffer combined buffer. - * @param combinedFileRange combined range. + * Drain unnecessary data in between ranges. + * @param objectContent s3 data stream. + * @param drainQuantity how many bytes to drain. + * @throws IOException any IOE. */ - private void updateOriginalRange(FileRange child, - ByteBuffer combinedBuffer, - CombinedFileRange combinedFileRange) { - LOG.trace("Start Filling original range [{}, {}) from combined range [{}, {}) ", - child.getOffset(), child.getLength(), - combinedFileRange.getOffset(), combinedFileRange.getLength()); - ByteBuffer childBuffer = sliceTo(combinedBuffer, combinedFileRange.getOffset(), child); - child.getData().complete(childBuffer); - LOG.trace("End Filling original range [{}, {}) from combined range [{}, {}) ", - child.getOffset(), child.getLength(), - combinedFileRange.getOffset(), combinedFileRange.getLength()); + private void drainUnnecessaryData(S3ObjectInputStream objectContent, long drainQuantity) + throws IOException { + int drainBytes = 0; + int readCount; + while (drainBytes < drainQuantity) { + if (drainBytes + DRAIN_BUFFER_SIZE <= drainQuantity) { + byte[] drainBuffer = new byte[DRAIN_BUFFER_SIZE]; + readCount = objectContent.read(drainBuffer); + } else { + byte[] drainBuffer = new byte[(int) (drainQuantity - drainBytes)]; + readCount = objectContent.read(drainBuffer); + } + drainBytes += readCount; + } + LOG.debug("{} bytes drained from stream ", drainBytes); } /** - * // Check if we can use contentLength returned by http GET request. * Validates range parameters. + * In case of S3 we already have contentLength from the first GET request + * during an open file operation so failing fast here. * @param range requested range. * @throws EOFException end of file exception. */ @@ -1038,13 +1090,7 @@ private void validateRangeRequest(FileRange range) throws EOFException { } /** - * TODO: Add retry in client.getObject(). not present in older reads why here?? - * Okay retry is being done in the top layer during read. - * But if we do here in the top layer, one issue I am thinking is - * what if there is some error which happened during filling the buffer - * If we retry that old offsets of heap buffers can be overwritten ? - * I think retry should be only added in {@link S3AInputStream#getS3Object} - * Read data from S3 for this range and populate the bufffer. + * Read data from S3 for this range and populate the buffer.. * @param range range of data to read. * @param buffer buffer to fill. */ @@ -1053,6 +1099,7 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) { S3Object objectRange = null; S3ObjectInputStream objectContent = null; try { + checkIfVectoredIOStopped(); long position = range.getOffset(); int length = range.getLength(); final String operationName = "readRange"; @@ -1089,6 +1136,7 @@ private void populateBuffer(int length, int offset = 0; byte[] tmp = new byte[TMP_BUFFER_MAX_SIZE]; while (readBytes < length) { + checkIfVectoredIOStopped(); int currentLength = readBytes + TMP_BUFFER_MAX_SIZE < length ? TMP_BUFFER_MAX_SIZE : length - readBytes; @@ -1103,7 +1151,15 @@ private void populateBuffer(int length, } } - public void readByteArray(S3ObjectInputStream objectContent, + /** + * Read data into destination buffer from s3 object content. + * @param objectContent result from S3. + * @param dest destination buffer. + * @param offset start offset of dest buffer. + * @param length number of bytes to fill in dest. + * @throws IOException any IOE. + */ + private void readByteArray(S3ObjectInputStream objectContent, byte[] dest, int offset, int length) throws IOException { @@ -1120,14 +1176,14 @@ public void readByteArray(S3ObjectInputStream objectContent, } /** - * Read data from S3 using a http request. - * This also handles if file has been changed while http call - * is getting executed. If file has been changed RemoteFileChangedException - * is thrown. + * Read data from S3 using a http request with retries. + * This also handles if file has been changed while http + * call is getting executed. If file has been changed + * RemoteFileChangedException is thrown. * @param operationName name of the operation for which get object on S3 is called. * @param position position of the object to be read from S3. * @param length length from position of the object to be read from S3. - * @return S3Object + * @return S3Object result s3 object. * @throws IOException exception if any. */ private S3Object getS3Object(String operationName, long position, @@ -1140,7 +1196,11 @@ private S3Object getS3Object(String operationName, long position, Invoker invoker = context.getReadInvoker(); try { objectRange = invoker.retry(operationName, pathStr, true, - () -> client.getObject(request)); + () -> { + checkIfVectoredIOStopped(); + return client.getObject(request); + }); + } catch (IOException ex) { tracker.failed(); throw ex; @@ -1152,6 +1212,12 @@ private S3Object getS3Object(String operationName, long position, return objectRange; } + private void checkIfVectoredIOStopped() throws InterruptedIOException { + if (stopVectoredIOOperations.get()) { + throw new InterruptedIOException("Stream closed or unbuffer is called"); + } + } + /** * Access the input stream statistics. * This is for internal testing and may be removed without warning. @@ -1241,6 +1307,7 @@ public static long validateReadahead(@Nullable Long readahead) { @Override public synchronized void unbuffer() { try { + stopVectoredIOOperations.set(true); closeStream("unbuffer()", false, false); } finally { streamStatistics.unbuffered(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 0752e75d2478a..d7594808b1925 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; @@ -33,7 +34,9 @@ import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.test.MoreAsserts.assertEqual; public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { @@ -99,4 +102,52 @@ public void testMinSeekAndMaxSizeDefaultValues() throws Exception { } } } + + @Test + public void testStopVectoredIoOperationsCloseStream() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = createSomeRandomRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ + in.readVectored(fileRanges, allocate); + in.close(); + LambdaTestUtils.intercept(InterruptedIOException.class, + () -> validateVectoredReadResult(fileRanges, DATASET)); + } + // reopening the stream should succeed. + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + } + } + + @Test + public void testStopVectoredIoOperationsUnbuffer() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = createSomeRandomRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ + in.readVectored(fileRanges, allocate); + in.unbuffer(); + LambdaTestUtils.intercept(InterruptedIOException.class, + () -> validateVectoredReadResult(fileRanges, DATASET)); + // re-initiating the vectored reads after unbuffer should succeed. + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + } + + } + + @Override + public void testOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = getSampleOverlappingRanges(); + validateUnsupportedOperation(fs, fileRanges); + } + + @Override + public void testSameRanges() throws Exception { + // Same ranges are special case of overlapping only. + FileSystem fs = getFileSystem(); + List fileRanges = getSampleSameRanges(); + validateUnsupportedOperation(fs, fileRanges); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 956e23a3f11c3..59228d8a19903 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -459,10 +459,10 @@ public void test_040_PositionedReadHugeFile() throws Throwable { public void test_045_vectoredIOHugeFile() throws Throwable { assumeHugeFileExists(); List rangeList = new ArrayList<>(); - rangeList.add(new FileRangeImpl(5856368, 1167716)); - rangeList.add(new FileRangeImpl(3520861, 1167700)); - rangeList.add(new FileRangeImpl(8191913, 1167775)); - rangeList.add(new FileRangeImpl(1520861, 1167700)); + rangeList.add(new FileRangeImpl(5856368, 116770)); + rangeList.add(new FileRangeImpl(3520861, 116770)); + rangeList.add(new FileRangeImpl(8191913, 116770)); + rangeList.add(new FileRangeImpl(1520861, 116770)); rangeList.add(new FileRangeImpl(2520861, 116770)); rangeList.add(new FileRangeImpl(9191913, 116770)); rangeList.add(new FileRangeImpl(2820861, 156770)); From b0b52f2bf4e47579a0d3d2ee4898afebd6a15ab6 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Mon, 13 Jun 2022 12:16:55 -0500 Subject: [PATCH 2/6] Refactoring and stream capabilities. --- .../apache/hadoop/fs/ChecksumFileSystem.java | 10 ++++-- .../java/org/apache/hadoop/fs/FileRange.java | 6 ++++ .../apache/hadoop/fs/PositionedReadable.java | 1 - .../apache/hadoop/fs/RawLocalFileSystem.java | 4 +-- .../apache/hadoop/fs/StreamCapabilities.java | 9 ++++++ .../fs/{impl => }/VectoredReadUtils.java | 3 +- .../hadoop/fs/impl/CombinedFileRange.java | 1 - .../hadoop/fs/{ => impl}/FileRangeImpl.java | 4 ++- .../fs/{impl => }/TestVectoredReadUtils.java | 10 +++--- .../AbstractContractVectoredReadTest.java | 31 +++++++++++++++++-- .../hadoop/fs/contract/ContractTestUtils.java | 13 ++++++++ .../TestLocalFSContractVectoredRead.java | 2 +- .../apache/hadoop/fs/s3a/S3AInputStream.java | 15 ++++----- .../s3a/ITestS3AContractVectoredRead.java | 8 ++++- .../s3a/scale/AbstractSTestS3AHugeFiles.java | 2 +- .../benchmark/VectoredReadBenchmark.java | 2 +- 16 files changed, 92 insertions(+), 29 deletions(-) rename hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/{impl => }/VectoredReadUtils.java (99%) rename hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/{ => impl}/FileRangeImpl.java (96%) rename hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/{impl => }/TestVectoredReadUtils.java (98%) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index d706226b51f8e..1cca9fe2bfdb1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -39,7 +39,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; -import org.apache.hadoop.fs.impl.VectoredReadUtils; import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.impl.OpenFileParameters; @@ -55,7 +54,7 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; /**************************************************************** * Abstract Checksumed FileSystem. @@ -167,7 +166,7 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) { * It verifies that data matches checksums. *******************************************************/ private static class ChecksumFSInputChecker extends FSInputChecker implements - IOStatisticsSource { + IOStatisticsSource, StreamCapabilities { private ChecksumFileSystem fs; private FSDataInputStream datas; private FSDataInputStream sums; @@ -436,6 +435,11 @@ public void readVectored(List ranges, } } } + + @Override + public boolean hasCapability(String capability) { + return datas.hasCapability(capability); + } } private static class FSDataBoundedInputStream extends FSDataInputStream { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java index 7388e462cc27b..6bfbf6f8c47b3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java @@ -20,6 +20,8 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.fs.impl.FileRangeImpl; + /** * A byte range of a file. * This is used for the asynchronous gather read API of @@ -52,4 +54,8 @@ public interface FileRange { * @param data the future of the ByteBuffer that will have the data */ void setData(CompletableFuture data); + + static FileRange createFileRange(long offset, int length) { + return new FileRangeImpl(offset, length); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index 26d7b3e90904e..de76090512705 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -25,7 +25,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.impl.VectoredReadUtils; /** * Stream that permits positional reading. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index eece36310e131..f525c3cba78fe 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -20,7 +20,6 @@ package org.apache.hadoop.fs; import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.fs.impl.VectoredReadUtils; import java.io.BufferedOutputStream; import java.io.DataOutput; @@ -68,7 +67,7 @@ import org.apache.hadoop.util.StringUtils; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS; @@ -279,6 +278,7 @@ public boolean hasCapability(String capability) { // new capabilities. switch (capability.toLowerCase(Locale.ENGLISH)) { case StreamCapabilities.IOSTATISTICS: + case StreamCapabilities.VECTOREDIO: return true; default: return false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 861178019505e..2a628306e6e57 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -18,6 +18,9 @@ package org.apache.hadoop.fs; +import java.util.List; +import java.util.function.IntFunction; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -80,6 +83,12 @@ public interface StreamCapabilities { */ String IOSTATISTICS = "iostatistics"; + /** + * Support for vectored IO api. + * See {@link PositionedReadable#readVectored(List, IntFunction)}. + */ + String VECTOREDIO = "readvectored"; + /** * Stream abort() capability implemented by {@link Abortable#abort()}. * This matches the Path Capability diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java similarity index 99% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index c3efd2e4695dd..bf1428712ec75 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.impl; +package org.apache.hadoop.fs; import java.io.EOFException; import java.io.IOException; @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.util.Preconditions; /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java index 828a50b4f7bf7..516bbb2c70c76 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.impl; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; import java.util.ArrayList; import java.util.List; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java similarity index 96% rename from hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java rename to hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java index ef5851154be00..34cfc92f17e80 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java @@ -15,11 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.fs; +package org.apache.hadoop.fs.impl; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.fs.FileRange; + /** * A range of bytes from a file with an optional buffer to read those bytes * for zero copy. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java similarity index 98% rename from hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java rename to hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java index 4bf842c70dc9e..a54df1b81b8d9 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.fs.impl; +package org.apache.hadoop.fs; import java.io.IOException; import java.nio.ByteBuffer; @@ -31,13 +31,11 @@ import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import org.apache.hadoop.fs.ByteBufferPositionedReadable; -import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; -import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.impl.FileRangeImpl; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.test.HadoopTestBase; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges; import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully; import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionally; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index af90d62247be4..65416de30a74c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -38,15 +38,18 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; @RunWith(Parameterized.class) @@ -91,6 +94,15 @@ public void teardown() throws Exception { pool.release(); } + @Test + public void testVectoredReadCapability() throws Exception { + FileSystem fs = getFileSystem(); + String[] vectoredReadCapability = new String[]{StreamCapabilities.VECTOREDIO}; + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + assertCapabilities(in, vectoredReadCapability, null); + } + } + @Test public void testVectoredReadMultipleRanges() throws Exception { FileSystem fs = getFileSystem(); @@ -110,6 +122,7 @@ public void testVectoredReadMultipleRanges() throws Exception { combinedFuture.get(); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -126,6 +139,7 @@ public void testVectoredReadAndReadFully() throws Exception { Assertions.assertThat(vecRes) .describedAs("Result from vectored read and readFully must match") .isEqualByComparingTo(ByteBuffer.wrap(readFullRes)); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -143,6 +157,7 @@ public void testDisjointRanges() throws Exception { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -160,6 +175,7 @@ public void testAllRangesMergedIntoOne() throws Exception { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -179,6 +195,7 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -189,6 +206,7 @@ public void testOverlappingRanges() throws Exception { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -200,6 +218,7 @@ public void testSameRanges() throws Exception { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -213,8 +232,8 @@ protected List getSampleSameRanges() { protected List getSampleOverlappingRanges() { List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(100, 500)); - fileRanges.add(new FileRangeImpl(400, 500)); + fileRanges.add(FileRange.createFileRange(100, 500)); + fileRanges.add(FileRange.createFileRange(400, 500)); return fileRanges; } protected void validateUnsupportedOperation(FileSystem fs, @@ -240,6 +259,7 @@ public void testSomeRandomNonOverlappingRanges() throws Exception { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -253,6 +273,7 @@ public void testConsecutiveRanges() throws Exception { try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -310,6 +331,7 @@ public void testNormalReadAfterVectoredRead() throws Exception { .describedAs("Vectored read shouldn't change file pointer.") .isEqualTo(200); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -328,6 +350,7 @@ public void testVectoredReadAfterNormalRead() throws Exception { .isEqualTo(200); in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -341,6 +364,8 @@ public void testMultipleVectoredReads() throws Exception { in.readVectored(fileRanges2, allocate); validateVectoredReadResult(fileRanges2, DATASET); validateVectoredReadResult(fileRanges1, DATASET); + returnBuffersToPoolPostRead(fileRanges1, pool); + returnBuffersToPoolPostRead(fileRanges2, pool); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index a316b02fb0844..b42d8184f0690 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.PathCapabilities; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.functional.FutureIO; @@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.EOFException; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -1136,6 +1138,17 @@ public static void validateVectoredReadResult(List fileRanges, } } + public static void returnBuffersToPoolPostRead(List fileRanges, + ByteBufferPool pool) + throws IOException, TimeoutException { + for (FileRange range : fileRanges) { + ByteBuffer buffer = FutureIO.awaitFuture(range.getData(), + VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS, + TimeUnit.SECONDS); + pool.putBuffer(buffer); + } + } + /** * Assert that the data read matches the dataset at the given offset. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 3ac7ca694af62..220762678890c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -30,7 +30,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index d261dba5aa7cb..dea3f350688d3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -48,7 +48,7 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.impl.CombinedFileRange; -import org.apache.hadoop.fs.impl.VectoredReadUtils; +import org.apache.hadoop.fs.VectoredReadUtils; import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.statistics.DurationTracker; @@ -60,9 +60,9 @@ import static java.util.Objects.requireNonNull; import static org.apache.commons.lang3.StringUtils.isNotEmpty; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.isOrderedDisjoint; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.mergeSortedRanges; -import static org.apache.hadoop.fs.impl.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint; +import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges; +import static org.apache.hadoop.fs.VectoredReadUtils.validateNonOverlappingAndReturnSortedRanges; import static org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration; import static org.apache.hadoop.util.StringUtils.toLowerCase; @@ -110,8 +110,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, /** * Atomic boolean variable to stop all ongoing vectored read operation - * for this input stream. This will be set to true stream is closed or - * unbuffer is called. + * for this input stream. This will be set to true when the stream is + * closed or unbuffer is called. */ private final AtomicBoolean stopVectoredIOOperations = new AtomicBoolean(false); @@ -1090,7 +1090,7 @@ private void validateRangeRequest(FileRange range) throws EOFException { } /** - * Read data from S3 for this range and populate the buffer.. + * Read data from S3 for this range and populate the buffer. * @param range range of data to read. * @param buffer buffer to fill. */ @@ -1320,6 +1320,7 @@ public boolean hasCapability(String capability) { case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.READAHEAD: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.VECTOREDIO: return true; default: return false; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index d7594808b1925..efc84a00e6535 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -136,6 +136,9 @@ public void testStopVectoredIoOperationsUnbuffer() throws Exception { } + /** + * S3 vectored IO doesn't support overlapping ranges. + */ @Override public void testOverlappingRanges() throws Exception { FileSystem fs = getFileSystem(); @@ -143,6 +146,9 @@ public void testOverlappingRanges() throws Exception { validateUnsupportedOperation(fs, fileRanges); } + /** + * S3 vectored IO doesn't support overlapping ranges. + */ @Override public void testSameRanges() throws Exception { // Same ranges are special case of overlapping only. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 59228d8a19903..6e54f4523498a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -41,7 +41,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java index aaee951d72cac..3b39ef2e20f57 100644 --- a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java +++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java @@ -47,7 +47,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; From 2db705af1877107256aa155069da3500bfdd9ba3 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Wed, 15 Jun 2022 13:43:34 -0500 Subject: [PATCH 3/6] Steve's comments --- .../java/org/apache/hadoop/fs/FileRange.java | 6 + .../apache/hadoop/fs/StreamCapabilities.java | 5 +- .../apache/hadoop/fs/VectoredReadUtils.java | 17 +- .../apache/hadoop/fs/impl/FileRangeImpl.java | 5 +- .../hadoop/fs/TestChecksumFileSystem.java | 3 +- .../hadoop/fs/TestVectoredReadUtils.java | 133 +++++++-------- .../AbstractContractVectoredReadTest.java | 160 +++++++++--------- .../hadoop/fs/contract/ContractTestUtils.java | 9 +- .../TestLocalFSContractVectoredRead.java | 9 +- .../apache/hadoop/fs/s3a/S3AInputStream.java | 27 ++- .../s3a/ITestS3AContractVectoredRead.java | 26 +-- .../s3a/scale/AbstractSTestS3AHugeFiles.java | 14 +- .../benchmark/VectoredReadBenchmark.java | 2 +- 13 files changed, 224 insertions(+), 192 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java index 6bfbf6f8c47b3..e55696e96507e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java @@ -55,6 +55,12 @@ public interface FileRange { */ void setData(CompletableFuture data); + /** + * Factory method to create a FileRange object. + * @param offset starting offset of the range. + * @param length length of the range. + * @return a new instance of FileRangeImpl. + */ static FileRange createFileRange(long offset, int length) { return new FileRangeImpl(offset, length); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 2a628306e6e57..d68ef505dc3fe 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -18,9 +18,6 @@ package org.apache.hadoop.fs; -import java.util.List; -import java.util.function.IntFunction; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -85,7 +82,7 @@ public interface StreamCapabilities { /** * Support for vectored IO api. - * See {@link PositionedReadable#readVectored(List, IntFunction)}. + * See {@code PositionedReadable#readVectored(List, IntFunction)}. */ String VECTOREDIO = "readvectored"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index bf1428712ec75..64107f1a18f89 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -28,9 +28,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.IntFunction; -import org.apache.hadoop.fs.ByteBufferPositionedReadable; -import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.util.Preconditions; @@ -80,7 +77,7 @@ public static void validateVectoredReadRanges(List ranges) public static void readVectored(PositionedReadable stream, List ranges, IntFunction allocate) { - for(FileRange range: ranges) { + for (FileRange range: ranges) { range.setData(readRangeFrom(stream, range, allocate)); } } @@ -151,7 +148,7 @@ public static boolean isOrderedDisjoint(List input, int chunkSize, int minimumSeek) { long previous = -minimumSeek; - for(FileRange range: input) { + for (FileRange range: input) { long offset = range.getOffset(); long end = range.getOffset() + range.getLength(); if (offset % chunkSize != 0 || @@ -209,7 +206,7 @@ public static List validateNonOverlappingAndReturnSortedRan } FileRange[] sortedRanges = sortRanges(input); FileRange prev = sortedRanges[0]; - for(int i=1; i validateNonOverlappingAndReturnSortedRan return Arrays.asList(sortedRanges); } + /** + * Sort the input ranges by offset. + * @param input input ranges. + * @return sorted ranges. + */ public static FileRange[] sortRanges(List input) { - // sort the ranges by offset FileRange[] sortedRanges = input.toArray(new FileRange[0]); Arrays.sort(sortedRanges, Comparator.comparingLong(FileRange::getOffset)); return sortedRanges; @@ -250,7 +251,7 @@ public static List mergeSortedRanges(List result = new ArrayList<>(sortedRanges.size()); // now merge together the ones that merge - for(FileRange range: sortedRanges) { + for (FileRange range: sortedRanges) { long start = roundDown(range.getOffset(), chunkSize); long end = roundUp(range.getOffset() + range.getLength(), chunkSize); if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java index 34cfc92f17e80..041e5f0a8d2d7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java @@ -20,12 +20,15 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileRange; /** * A range of bytes from a file with an optional buffer to read those bytes - * for zero copy. + * for zero copy. This shouldn't be created directly via constructor rather + * factory defined in {@code FileRange#createFileRange} should be used. */ +@InterfaceAudience.Private public class FileRangeImpl implements FileRange { private long offset; private int length; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java index c912f11993cb0..4d61154490838 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestChecksumFileSystem.java @@ -19,8 +19,9 @@ package org.apache.hadoop.fs; import java.util.Arrays; -import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.permission.FsPermission; import static org.apache.hadoop.fs.FileSystemTestHelper.*; import org.apache.hadoop.conf.Configuration; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java index a54df1b81b8d9..5d08b02e113d5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java @@ -31,7 +31,6 @@ import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.test.HadoopTestBase; @@ -55,7 +54,7 @@ public void testSliceTo() { } // ensure we don't make unnecessary slices ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100, - new FileRangeImpl(100, size)); + FileRange.createFileRange(100, size)); Assertions.assertThat(buffer) .describedAs("Slicing on the same offset shouldn't " + "create a new buffer") @@ -66,7 +65,7 @@ public void testSliceTo() { final int sliceStart = 1024; final int sliceLength = 16 * 1024; slice = VectoredReadUtils.sliceTo(buffer, offset, - new FileRangeImpl(offset + sliceStart, sliceLength)); + FileRange.createFileRange(offset + sliceStart, sliceLength)); // make sure they aren't the same, but use the same backing data Assertions.assertThat(buffer) .describedAs("Slicing on new offset should " + @@ -95,12 +94,12 @@ public void testRounding() { @Test public void testMerge() { - FileRange base = new FileRangeImpl(2000, 1000); + FileRange base = FileRange.createFileRange(2000, 1000); CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); // test when the gap between is too big assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000, - new FileRangeImpl(5000, 1000), 2000, 4000)); + FileRange.createFileRange(5000, 1000), 2000, 4000)); assertEquals("Number of ranges in merged range shouldn't increase", 1, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 2000, mergeBase.getOffset()); @@ -108,7 +107,7 @@ public void testMerge() { // test when the total size gets exceeded assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000, - new FileRangeImpl(5000, 1000), 2001, 3999)); + FileRange.createFileRange(5000, 1000), 2001, 3999)); assertEquals("Number of ranges in merged range shouldn't increase", 1, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 2000, mergeBase.getOffset()); @@ -116,7 +115,7 @@ public void testMerge() { // test when the merge works assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000, - new FileRangeImpl(5000, 1000), 2001, 4000)); + FileRange.createFileRange(5000, 1000), 2001, 4000)); assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 2000, mergeBase.getOffset()); assertEquals("post merge length", 4000, mergeBase.getLength()); @@ -126,7 +125,7 @@ public void testMerge() { assertEquals(200, mergeBase.getOffset()); assertEquals(100, mergeBase.getLength()); assertTrue("ranges should get merged ", mergeBase.merge(500, 600, - new FileRangeImpl(5000, 1000), 201, 400)); + FileRange.createFileRange(5000, 1000), 201, 400)); assertEquals("post merge size", 2, mergeBase.getUnderlying().size()); assertEquals("post merge offset", 200, mergeBase.getOffset()); assertEquals("post merge length", 400, mergeBase.getLength()); @@ -135,42 +134,58 @@ public void testMerge() { @Test public void testSortAndMerge() { List input = Arrays.asList( - new FileRangeImpl(3000, 100), - new FileRangeImpl(2100, 100), - new FileRangeImpl(1000, 100) + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100), + FileRange.createFileRange(1000, 100) ); assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); List outputList = VectoredReadUtils.mergeSortedRanges( Arrays.asList(sortRanges(input)), 100, 1001, 2500); - assertEquals("merged range size", 1, outputList.size()); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); CombinedFileRange output = outputList.get(0); - assertEquals("merged range underlying size", 3, output.getUnderlying().size()); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(3); assertEquals("range[1000,3100)", output.toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); // the minSeek doesn't allow the first two to merge - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); - outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), 100, 1000, 2100); - assertEquals("merged range size", 2, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + 100, 1000, 2100); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(2); assertEquals("range[1000,1100)", outputList.get(0).toString()); assertEquals("range[2100,3100)", outputList.get(1).toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000)); // the maxSize doesn't allow the third range to merge - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); - outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), 100, 1001, 2099); - assertEquals("merged range size", 2, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + 100, 1001, 2099); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(2); assertEquals("range[1000,2200)", outputList.get(0).toString()); assertEquals("range[3000,3100)", outputList.get(1).toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); // test the round up and round down (the maxSize doesn't allow any merges) - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); - outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), 16, 1001, 100); - assertEquals("merged range size", 3, outputList.size()); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); + outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)), + 16, 1001, 100); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(3); assertEquals("range[992,1104)", outputList.get(0).toString()); assertEquals("range[2096,2208)", outputList.get(1).toString()); assertEquals("range[2992,3104)", outputList.get(2).toString()); @@ -181,26 +196,35 @@ public void testSortAndMerge() { @Test public void testSortAndMergeMoreCases() throws Exception { List input = Arrays.asList( - new FileRangeImpl(3000, 110), - new FileRangeImpl(3000, 100), - new FileRangeImpl(2100, 100), - new FileRangeImpl(1000, 100) + FileRange.createFileRange(3000, 110), + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100), + FileRange.createFileRange(1000, 100) ); - assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + assertFalse("Ranges are non disjoint", + VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); List outputList = VectoredReadUtils.mergeSortedRanges( Arrays.asList(sortRanges(input)), 1, 1001, 2500); - assertEquals("merged range size", 1, outputList.size()); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); CombinedFileRange output = outputList.get(0); - assertEquals("merged range underlying size", 4, output.getUnderlying().size()); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(4); assertEquals("range[1000,3110)", output.toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); outputList = VectoredReadUtils.mergeSortedRanges( Arrays.asList(sortRanges(input)), 100, 1001, 2500); - assertEquals("merged range size", 1, outputList.size()); + Assertions.assertThat(outputList) + .describedAs("merged range size") + .hasSize(1); output = outputList.get(0); - assertEquals("merged range underlying size", 4, output.getUnderlying().size()); + Assertions.assertThat(output.getUnderlying()) + .describedAs("merged range underlying size") + .hasSize(4); assertEquals("range[1000,3200)", output.toString()); assertTrue("merged output ranges are disjoint", VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); @@ -210,9 +234,9 @@ public void testSortAndMergeMoreCases() throws Exception { @Test public void testMaxSizeZeroDisablesMering() throws Exception { List randomRanges = Arrays.asList( - new FileRangeImpl(3000, 110), - new FileRangeImpl(3000, 100), - new FileRangeImpl(2100, 100) + FileRange.createFileRange(3000, 110), + FileRange.createFileRange(3000, 100), + FileRange.createFileRange(2100, 100) ); assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0); assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0); @@ -250,7 +274,7 @@ public void testReadRangeFromByteBufferPositionedReadable() throws Exception { }).when(stream).readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); CompletableFuture result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), ByteBuffer::allocate); assertFutureCompletedSuccessfully(result); ByteBuffer buffer = result.get(); @@ -266,7 +290,7 @@ public void testReadRangeFromByteBufferPositionedReadable() throws Exception { .when(stream).readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), ByteBuffer::allocate); assertFutureFailedExceptionally(result); } @@ -285,7 +309,7 @@ static void runReadRangeFromPositionedReadable(IntFunction allocate) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()); CompletableFuture result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), allocate); assertFutureCompletedSuccessfully(result); ByteBuffer buffer = result.get(); @@ -302,7 +326,7 @@ static void runReadRangeFromPositionedReadable(IntFunction allocate) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt()); result = - VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + VectoredReadUtils.readRangeFrom(stream, FileRange.createFileRange(1000, 100), ByteBuffer::allocate); assertFutureFailedExceptionally(result); } @@ -327,9 +351,9 @@ static void validateBuffer(String message, ByteBuffer buffer, int start) { @Test public void testReadVectored() throws Exception { - List input = Arrays.asList(new FileRangeImpl(0, 100), - new FileRangeImpl(100_000, 100), - new FileRangeImpl(200_000, 100)); + List input = Arrays.asList(FileRange.createFileRange(0, 100), + FileRange.createFileRange(100_000, 100), + FileRange.createFileRange(200_000, 100)); Stream stream = Mockito.mock(Stream.class); Mockito.doAnswer(invocation -> { fillBuffer(invocation.getArgument(1)); @@ -344,31 +368,4 @@ public void testReadVectored() throws Exception { validateBuffer("buffer " + b, input.get(b).getData().get(), 0); } } - - /** - * TODO: Honestly this test doesn't makes sense much now as it is similar to above. - * Took time to fix this though. If you guys approve, I will remove. - * @throws Exception - */ - @Test - public void testReadVectoredMerge() throws Exception { - List input = Arrays.asList(new FileRangeImpl(2000, 100), - new FileRangeImpl(1000, 100), - new FileRangeImpl(0, 100)); - Stream stream = Mockito.mock(Stream.class); - Mockito.doAnswer(invocation -> { - fillBuffer(invocation.getArgument(1)); - return null; - }).when(stream).readFully(ArgumentMatchers.anyLong(), - ArgumentMatchers.any(ByteBuffer.class)); - // Default vectored read implementation doesn't do merging thus - // number of invocation should be equal to number of ranges. - VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate); - Mockito.verify(stream, Mockito.times(input.size())) - .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); - for(int b=0; b < input.size(); ++b) { - // setting the start to zero as buffer will be filled separately for each range. - validateBuffer("buffer " + b, input.get(b).getData().get(), 0); - } - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index 65416de30a74c..b8d225d49c12a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.contract; import java.io.EOFException; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -38,8 +37,8 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.StreamCapabilities; -import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.impl.FutureIOSupport; @@ -61,9 +60,9 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; - protected final IntFunction allocate; + private final IntFunction allocate; - private WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + private final WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); private final String bufferType; @@ -80,6 +79,10 @@ public AbstractContractVectoredReadTest(String bufferType) { }; } + public IntFunction getAllocate() { + return allocate; + } + @Override public void setup() throws Exception { super.setup(); @@ -108,7 +111,7 @@ public void testVectoredReadMultipleRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); for (int i = 0; i < 10; i++) { - FileRange fileRange = new FileRangeImpl(i * 100, 100); + FileRange fileRange = FileRange.createFileRange(i * 100, 100); fileRanges.add(fileRange); } try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { @@ -130,7 +133,7 @@ public void testVectoredReadMultipleRanges() throws Exception { public void testVectoredReadAndReadFully() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(100, 100)); + fileRanges.add(FileRange.createFileRange(100, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); byte[] readFullRes = new byte[100]; @@ -151,9 +154,9 @@ public void testVectoredReadAndReadFully() throws Exception { public void testDisjointRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 100)); - fileRanges.add(new FileRangeImpl(4 * 1024 + 101, 100)); - fileRanges.add(new FileRangeImpl(16 * 1024 + 101, 100)); + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(4_000 + 101, 100)); + fileRanges.add(FileRange.createFileRange(16_000 + 101, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); @@ -169,9 +172,9 @@ public void testDisjointRanges() throws Exception { public void testAllRangesMergedIntoOne() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 100)); - fileRanges.add(new FileRangeImpl(4 *1024 - 101, 100)); - fileRanges.add(new FileRangeImpl(8*1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(4_000 - 101, 100)); + fileRanges.add(FileRange.createFileRange(8_000 - 101, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); @@ -187,12 +190,17 @@ public void testAllRangesMergedIntoOne() throws Exception { public void testSomeRangesMergedSomeUnmerged() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(8*1024, 100)); - fileRanges.add(new FileRangeImpl(14*1024, 100)); - fileRanges.add(new FileRangeImpl(10*1024, 100)); - fileRanges.add(new FileRangeImpl(2 *1024 - 101, 100)); - fileRanges.add(new FileRangeImpl(40*1024, 1024)); - try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, pool); @@ -203,7 +211,12 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception { public void testOverlappingRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = getSampleOverlappingRanges(); - try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); returnBuffersToPoolPostRead(fileRanges, pool); @@ -215,36 +228,13 @@ public void testSameRanges() throws Exception { // Same ranges are special case of overlapping only. FileSystem fs = getFileSystem(); List fileRanges = getSampleSameRanges(); - try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { - in.readVectored(fileRanges, allocate); - validateVectoredReadResult(fileRanges, DATASET); - returnBuffersToPoolPostRead(fileRanges, pool); - } - } - - protected List getSampleSameRanges() { - List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(8*1024, 1000)); - fileRanges.add(new FileRangeImpl(8*1024, 1000)); - fileRanges.add(new FileRangeImpl(8*1024, 1000)); - return fileRanges; - } - - protected List getSampleOverlappingRanges() { - List fileRanges = new ArrayList<>(); - fileRanges.add(FileRange.createFileRange(100, 500)); - fileRanges.add(FileRange.createFileRange(400, 500)); - return fileRanges; - } - protected void validateUnsupportedOperation(FileSystem fs, - List fileRanges) - throws Exception { CompletableFuture builder = fs.openFile(path(VECTORED_READ_FILE_NAME)) .build(); try (FSDataInputStream in = builder.get()) { - LambdaTestUtils.intercept(UnsupportedOperationException.class, - () -> in.readVectored(fileRanges, allocate)); + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, pool); } } @@ -252,10 +242,10 @@ protected void validateUnsupportedOperation(FileSystem fs, public void testSomeRandomNonOverlappingRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(500, 100)); - fileRanges.add(new FileRangeImpl(1000, 200)); - fileRanges.add(new FileRangeImpl(50, 10)); - fileRanges.add(new FileRangeImpl(10, 5)); + fileRanges.add(FileRange.createFileRange(500, 100)); + fileRanges.add(FileRange.createFileRange(1000, 200)); + fileRanges.add(FileRange.createFileRange(50, 10)); + fileRanges.add(FileRange.createFileRange(10, 5)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); @@ -267,9 +257,9 @@ public void testSomeRandomNonOverlappingRanges() throws Exception { public void testConsecutiveRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(500, 100)); - fileRanges.add(new FileRangeImpl(600, 200)); - fileRanges.add(new FileRangeImpl(800, 100)); + fileRanges.add(FileRange.createFileRange(500, 100)); + fileRanges.add(FileRange.createFileRange(600, 200)); + fileRanges.add(FileRange.createFileRange(800, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); validateVectoredReadResult(fileRanges, DATASET); @@ -281,7 +271,7 @@ public void testConsecutiveRanges() throws Exception { public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); for (FileRange res : fileRanges) { @@ -304,22 +294,22 @@ public void testEOFRanges() throws Exception { public void testNegativeLengthRange() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, -50)); - testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); + fileRanges.add(FileRange.createFileRange(0, -50)); + testExceptionalVectoredRead(fs, fileRanges, IllegalArgumentException.class); } @Test public void testNegativeOffsetRange() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(-1, 50)); - testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); + fileRanges.add(FileRange.createFileRange(-1, 50)); + testExceptionalVectoredRead(fs, fileRanges, EOFException.class); } @Test public void testNormalReadAfterVectoredRead() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges = createSomeRandomRanges(); + List fileRanges = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges, allocate); // read starting 200 bytes @@ -338,7 +328,7 @@ public void testNormalReadAfterVectoredRead() throws Exception { @Test public void testVectoredReadAfterNormalRead() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges = createSomeRandomRanges(); + List fileRanges = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { // read starting 200 bytes byte[] res = new byte[200]; @@ -357,8 +347,8 @@ public void testVectoredReadAfterNormalRead() throws Exception { @Test public void testMultipleVectoredReads() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges1 = createSomeRandomRanges(); - List fileRanges2 = createSomeRandomRanges(); + List fileRanges1 = createSampleNonOverlappingRanges(); + List fileRanges2 = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { in.readVectored(fileRanges1, allocate); in.readVectored(fileRanges2, allocate); @@ -369,27 +359,45 @@ public void testMultipleVectoredReads() throws Exception { } } - protected List createSomeRandomRanges() { + protected List createSampleNonOverlappingRanges() { List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(0, 100)); - fileRanges.add(new FileRangeImpl(110, 50)); + fileRanges.add(FileRange.createFileRange(0, 100)); + fileRanges.add(FileRange.createFileRange(110, 50)); return fileRanges; } + protected List getSampleSameRanges() { + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + fileRanges.add(FileRange.createFileRange(8_000, 1000)); + return fileRanges; + } + + protected List getSampleOverlappingRanges() { + List fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(100, 500)); + fileRanges.add(FileRange.createFileRange(400, 500)); + return fileRanges; + } - protected void testExceptionalVectoredRead(FileSystem fs, + /** + * Validate that exceptions must be thrown during a vectored + * read operation with specific input ranges. + * @param fs FileSystem instance. + * @param fileRanges input file ranges. + * @param clazz type of exception expected. + * @throws Exception any other IOE. + */ + protected void testExceptionalVectoredRead(FileSystem fs, List fileRanges, - String s) throws IOException { - boolean exRaised = false; - try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { - // Can we intercept here as done in S3 tests ?? - in.readVectored(fileRanges, allocate); - } catch (EOFException | IllegalArgumentException ex) { - // expected. - exRaised = true; + Class clazz) throws Exception { + CompletableFuture builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .build(); + try (FSDataInputStream in = builder.get()) { + LambdaTestUtils.intercept(clazz, + () -> in.readVectored(fileRanges, allocate)); } - Assertions.assertThat(exRaised) - .describedAs(s) - .isTrue(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index b42d8184f0690..0bd147c43bbca 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory; import java.io.EOFException; -import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -1138,6 +1137,14 @@ public static void validateVectoredReadResult(List fileRanges, } } + /** + * Utility to return buffers back to the pool once all + * data has been read for each file range. + * @param fileRanges list of file range. + * @param pool buffer pool. + * @throws IOException any IOE + * @throws TimeoutException ideally this should never occur. + */ public static void returnBuffersToPoolPostRead(List fileRanges, ByteBufferPool pool) throws IOException, TimeoutException { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 220762678890c..934de6eca38e8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; @@ -65,10 +64,10 @@ public void testChecksumValidationDuringVectoredRead() throws Exception { .isTrue(); CompletableFuture fis = localFs.openFile(testPath).build(); List someRandomRanges = new ArrayList<>(); - someRandomRanges.add(new FileRangeImpl(10, 1024)); - someRandomRanges.add(new FileRangeImpl(1025, 1024)); + someRandomRanges.add(FileRange.createFileRange(10, 1024)); + someRandomRanges.add(FileRange.createFileRange(1025, 1024)); try (FSDataInputStream in = fis.get()){ - in.readVectored(someRandomRanges, allocate); + in.readVectored(someRandomRanges, getAllocate()); validateVectoredReadResult(someRandomRanges, DATASET_CORRECT); } byte[] DATASET_CORRUPTED = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); @@ -77,7 +76,7 @@ public void testChecksumValidationDuringVectoredRead() throws Exception { } CompletableFuture fisN = localFs.openFile(testPath).build(); try (FSDataInputStream in = fisN.get()){ - in.readVectored(someRandomRanges, allocate); + in.readVectored(someRandomRanges, getAllocate()); // Expect checksum exception when data is updated directly through // raw local fs instance. intercept(ChecksumException.class, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index dea3f350688d3..e5c279e22e139 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -961,7 +961,7 @@ public void readVectored(List ranges, if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) { LOG.debug("Not merging the ranges as they are disjoint"); - for(FileRange range: sortedRanges) { + for (FileRange range: sortedRanges) { ByteBuffer buffer = allocate.apply(range.getLength()); unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); } @@ -972,7 +972,7 @@ public void readVectored(List ranges, maxReadSizeForVectorReads()); LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", ranges.size(), combinedFileRanges.size()); - for(CombinedFileRange combinedFileRange: combinedFileRanges) { + for (CombinedFileRange combinedFileRange: combinedFileRanges) { unboundedThreadPool.submit( () -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate)); } @@ -990,6 +990,8 @@ public void readVectored(List ranges, private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, IntFunction allocate) { LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr); + // This reference is must be kept till all buffers are populated as this is a + // finalizable object which closes the internal stream when gc triggers. S3Object objectRange = null; S3ObjectInputStream objectContent = null; try { @@ -1177,9 +1179,9 @@ private void readByteArray(S3ObjectInputStream objectContent, /** * Read data from S3 using a http request with retries. - * This also handles if file has been changed while http - * call is getting executed. If file has been changed - * RemoteFileChangedException is thrown. + * This also handles if file has been changed while the + * http call is getting executed. If the file has been + * changed RemoteFileChangedException is thrown. * @param operationName name of the operation for which get object on S3 is called. * @param position position of the object to be read from S3. * @param length length from position of the object to be read from S3. @@ -1196,10 +1198,10 @@ private S3Object getS3Object(String operationName, long position, Invoker invoker = context.getReadInvoker(); try { objectRange = invoker.retry(operationName, pathStr, true, - () -> { + () -> { checkIfVectoredIOStopped(); return client.getObject(request); - }); + }); } catch (IOException ex) { tracker.failed(); @@ -1212,6 +1214,13 @@ private S3Object getS3Object(String operationName, long position, return objectRange; } + /** + * Check if vectored io operation has been stooped. This happens + * when the stream is closed or unbuffer is called. + * @throws InterruptedIOException throw InterruptedIOException such + * that all running vectored io is + * terminated thus releasing resources. + */ private void checkIfVectoredIOStopped() throws InterruptedIOException { if (stopVectoredIOOperations.get()) { throw new InterruptedIOException("Stream closed or unbuffer is called"); @@ -1303,6 +1312,10 @@ public static long validateReadahead(@Nullable Long readahead) { /** * Closes the underlying S3 stream, and merges the {@link #streamStatistics} * instance associated with the stream. + * Also sets the {@code stopVectoredIOOperations} flag to true such that + * active vectored read operations are terminated. However termination of + * old vectored reads are not guaranteed if a new vectored read operation + * is initiated after unbuffer is called. */ @Override public synchronized void unbuffer() { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index efc84a00e6535..6d46bc8245451 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.EOFException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; @@ -27,7 +28,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -58,8 +58,8 @@ protected AbstractFSContract createContract(Configuration conf) { public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); - fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); - testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected"); + fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); + testExceptionalVectoredRead(fs, fileRanges, EOFException.class); } @Test @@ -106,16 +106,16 @@ public void testMinSeekAndMaxSizeDefaultValues() throws Exception { @Test public void testStopVectoredIoOperationsCloseStream() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges = createSomeRandomRanges(); + List fileRanges = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ - in.readVectored(fileRanges, allocate); + in.readVectored(fileRanges, getAllocate()); in.close(); LambdaTestUtils.intercept(InterruptedIOException.class, - () -> validateVectoredReadResult(fileRanges, DATASET)); + () -> validateVectoredReadResult(fileRanges, DATASET)); } // reopening the stream should succeed. try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ - in.readVectored(fileRanges, allocate); + in.readVectored(fileRanges, getAllocate()); validateVectoredReadResult(fileRanges, DATASET); } } @@ -123,14 +123,14 @@ public void testStopVectoredIoOperationsCloseStream() throws Exception { @Test public void testStopVectoredIoOperationsUnbuffer() throws Exception { FileSystem fs = getFileSystem(); - List fileRanges = createSomeRandomRanges(); + List fileRanges = createSampleNonOverlappingRanges(); try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))){ - in.readVectored(fileRanges, allocate); + in.readVectored(fileRanges, getAllocate()); in.unbuffer(); LambdaTestUtils.intercept(InterruptedIOException.class, - () -> validateVectoredReadResult(fileRanges, DATASET)); + () -> validateVectoredReadResult(fileRanges, DATASET)); // re-initiating the vectored reads after unbuffer should succeed. - in.readVectored(fileRanges, allocate); + in.readVectored(fileRanges, getAllocate()); validateVectoredReadResult(fileRanges, DATASET); } @@ -143,7 +143,7 @@ public void testStopVectoredIoOperationsUnbuffer() throws Exception { public void testOverlappingRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = getSampleOverlappingRanges(); - validateUnsupportedOperation(fs, fileRanges); + testExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); } /** @@ -154,6 +154,6 @@ public void testSameRanges() throws Exception { // Same ranges are special case of overlapping only. FileSystem fs = getFileSystem(); List fileRanges = getSampleSameRanges(); - validateUnsupportedOperation(fs, fileRanges); + testExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index 6e54f4523498a..ffb7b53653f9e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -459,13 +459,13 @@ public void test_040_PositionedReadHugeFile() throws Throwable { public void test_045_vectoredIOHugeFile() throws Throwable { assumeHugeFileExists(); List rangeList = new ArrayList<>(); - rangeList.add(new FileRangeImpl(5856368, 116770)); - rangeList.add(new FileRangeImpl(3520861, 116770)); - rangeList.add(new FileRangeImpl(8191913, 116770)); - rangeList.add(new FileRangeImpl(1520861, 116770)); - rangeList.add(new FileRangeImpl(2520861, 116770)); - rangeList.add(new FileRangeImpl(9191913, 116770)); - rangeList.add(new FileRangeImpl(2820861, 156770)); + rangeList.add(FileRange.createFileRange(5856368, 116770)); + rangeList.add(FileRange.createFileRange(3520861, 116770)); + rangeList.add(FileRange.createFileRange(8191913, 116770)); + rangeList.add(FileRange.createFileRange(1520861, 116770)); + rangeList.add(FileRange.createFileRange(2520861, 116770)); + rangeList.add(FileRange.createFileRange(9191913, 116770)); + rangeList.add(FileRange.createFileRange(2820861, 156770)); IntFunction allocate = ByteBuffer::allocate; FileSystem fs = getFileSystem(); CompletableFuture builder = diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java index 3b39ef2e20f57..631842f78e20d 100644 --- a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java +++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java @@ -107,7 +107,7 @@ public void asyncRead(FileSystemChoice fsChoice, FSDataInputStream stream = fsChoice.fs.open(DATA_PATH); List ranges = new ArrayList<>(); for(int m=0; m < 100; ++m) { - FileRangeImpl range = new FileRangeImpl(m * SEEK_SIZE, READ_SIZE); + FileRange range = FileRange.createFileRange(m * SEEK_SIZE, READ_SIZE); ranges.add(range); } stream.readVectored(ranges, bufferChoice.allocate); From e9fa4b4286aa8846d1760feee1dbb53ee61d26e8 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Thu, 16 Jun 2022 16:42:16 -0500 Subject: [PATCH 4/6] documentation and checkstyle --- .../src/site/markdown/filesystem/fsdatainputstream.md | 11 +++++++++-- .../fs/contract/AbstractContractVectoredReadTest.java | 5 +++-- .../localfs/TestLocalFSContractVectoredRead.java | 6 +++--- .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java | 1 - 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index e4a2830967ebb..ab3e8e32660cf 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -449,7 +449,14 @@ Read fully data for a list of ranges asynchronously. The default implementation iterates through the ranges, tries to coalesce the ranges based on values of `minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged ranges synchronously, but the intent is sub classes can implement efficient -implementation. +implementation. Reading in both direct and heap byte buffers are supported. +Also, clients are encouraged to use `WeakReferencedElasticByteBufferPool` for +allocating buffers such that even direct buffers are garbage collected when +they are no longer referenced. + +Note: Don't use direct buffers for reading from ChecksumFileSystem as that may +lead to memory fragmentation explained in HADOOP-18296. + #### Preconditions @@ -467,7 +474,7 @@ For each requested range: ### `minSeekForVectorReads()` -Smallest reasonable seek. Two ranges won't be merged together if the difference between +The smallest reasonable seek. Two ranges won't be merged together if the difference between end of first and start of next range is more than this value. ### `maxReadSizeForVectorReads()` diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index b8d225d49c12a..3789cdff349fd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -62,7 +62,8 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac private final IntFunction allocate; - private final WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + private final WeakReferencedElasticByteBufferPool pool = + new WeakReferencedElasticByteBufferPool(); private final String bufferType; @@ -397,7 +398,7 @@ protected void testExceptionalVectoredRead(FileSystem fs, .build(); try (FSDataInputStream in = builder.get()) { LambdaTestUtils.intercept(clazz, - () -> in.readVectored(fileRanges, allocate)); + () -> in.readVectored(fileRanges, allocate)); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 934de6eca38e8..87dd299768c51 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -54,7 +54,7 @@ protected AbstractFSContract createContract(Configuration conf) { public void testChecksumValidationDuringVectoredRead() throws Exception { Path testPath = path("big_range_checksum"); LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); - byte[] DATASET_CORRECT = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + final byte[] DATASET_CORRECT = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); try (FSDataOutputStream out = localFs.create(testPath, true)){ out.write(DATASET_CORRECT); } @@ -70,7 +70,7 @@ public void testChecksumValidationDuringVectoredRead() throws Exception { in.readVectored(someRandomRanges, getAllocate()); validateVectoredReadResult(someRandomRanges, DATASET_CORRECT); } - byte[] DATASET_CORRUPTED = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); + final byte[] DATASET_CORRUPTED = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){ out.write(DATASET_CORRUPTED); } @@ -80,7 +80,7 @@ public void testChecksumValidationDuringVectoredRead() throws Exception { // Expect checksum exception when data is updated directly through // raw local fs instance. intercept(ChecksumException.class, - () -> validateVectoredReadResult(someRandomRanges, DATASET_CORRUPTED)); + () -> validateVectoredReadResult(someRandomRanges, DATASET_CORRUPTED)); } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java index ffb7b53653f9e..f8d47011de3f0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -41,7 +41,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.impl.FileRangeImpl; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; From a848052ff21f1799089729741704ec4fb43d0e80 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Fri, 17 Jun 2022 14:47:18 -0500 Subject: [PATCH 5/6] Yetus --- .../site/markdown/filesystem/fsdatainputstream.md | 2 +- .../localfs/TestLocalFSContractVectoredRead.java | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index ab3e8e32660cf..197b999c81f66 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -450,7 +450,7 @@ iterates through the ranges, tries to coalesce the ranges based on values of `minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged ranges synchronously, but the intent is sub classes can implement efficient implementation. Reading in both direct and heap byte buffers are supported. -Also, clients are encouraged to use `WeakReferencedElasticByteBufferPool` for +Also, clients are encouraged to use `WeakReferencedElasticByteBufferPool` for allocating buffers such that even direct buffers are garbage collected when they are no longer referenced. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 87dd299768c51..5d6ca3f8f0c90 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -54,9 +54,9 @@ protected AbstractFSContract createContract(Configuration conf) { public void testChecksumValidationDuringVectoredRead() throws Exception { Path testPath = path("big_range_checksum"); LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); - final byte[] DATASET_CORRECT = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + final byte[] datasetCorrect = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); try (FSDataOutputStream out = localFs.create(testPath, true)){ - out.write(DATASET_CORRECT); + out.write(datasetCorrect); } Path checksumPath = localFs.getChecksumFile(testPath); Assertions.assertThat(localFs.exists(checksumPath)) @@ -68,11 +68,11 @@ public void testChecksumValidationDuringVectoredRead() throws Exception { someRandomRanges.add(FileRange.createFileRange(1025, 1024)); try (FSDataInputStream in = fis.get()){ in.readVectored(someRandomRanges, getAllocate()); - validateVectoredReadResult(someRandomRanges, DATASET_CORRECT); + validateVectoredReadResult(someRandomRanges, datasetCorrect); } - final byte[] DATASET_CORRUPTED = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); + final byte[] datasetCorrupted = ContractTestUtils.dataset(DATASET_LEN, 'a', 64); try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){ - out.write(DATASET_CORRUPTED); + out.write(datasetCorrupted); } CompletableFuture fisN = localFs.openFile(testPath).build(); try (FSDataInputStream in = fisN.get()){ @@ -80,7 +80,7 @@ public void testChecksumValidationDuringVectoredRead() throws Exception { // Expect checksum exception when data is updated directly through // raw local fs instance. intercept(ChecksumException.class, - () -> validateVectoredReadResult(someRandomRanges, DATASET_CORRUPTED)); + () -> validateVectoredReadResult(someRandomRanges, datasetCorrupted)); } } } From ffde6459d00acb4b24f7f352f47fbe2b981ddbd6 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Mon, 20 Jun 2022 12:47:35 -0500 Subject: [PATCH 6/6] minor fixes --- .../contract/AbstractContractVectoredReadTest.java | 12 +++++++----- .../org/apache/hadoop/fs/s3a/S3AInputStream.java | 2 +- .../contract/s3a/ITestS3AContractVectoredRead.java | 6 +++--- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index 3789cdff349fd..77bcc496ff4a2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -296,7 +296,7 @@ public void testNegativeLengthRange() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); fileRanges.add(FileRange.createFileRange(0, -50)); - testExceptionalVectoredRead(fs, fileRanges, IllegalArgumentException.class); + verifyExceptionalVectoredRead(fs, fileRanges, IllegalArgumentException.class); } @Test @@ -304,7 +304,7 @@ public void testNegativeOffsetRange() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); fileRanges.add(FileRange.createFileRange(-1, 50)); - testExceptionalVectoredRead(fs, fileRanges, EOFException.class); + verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); } @Test @@ -390,9 +390,11 @@ protected List getSampleOverlappingRanges() { * @param clazz type of exception expected. * @throws Exception any other IOE. */ - protected void testExceptionalVectoredRead(FileSystem fs, - List fileRanges, - Class clazz) throws Exception { + protected void verifyExceptionalVectoredRead( + FileSystem fs, + List fileRanges, + Class clazz) throws Exception { + CompletableFuture builder = fs.openFile(path(VECTORED_READ_FILE_NAME)) .build(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index e5c279e22e139..3069f17289119 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -1007,7 +1007,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa } populateChildBuffers(combinedFileRange, objectContent, allocate); } catch (Exception ex) { - LOG.warn("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex); + LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex); for(FileRange child : combinedFileRange.getUnderlying()) { child.getData().completeExceptionally(ex); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 6d46bc8245451..18a727dcdceed 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -59,7 +59,7 @@ public void testEOFRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = new ArrayList<>(); fileRanges.add(FileRange.createFileRange(DATASET_LEN, 100)); - testExceptionalVectoredRead(fs, fileRanges, EOFException.class); + verifyExceptionalVectoredRead(fs, fileRanges, EOFException.class); } @Test @@ -143,7 +143,7 @@ public void testStopVectoredIoOperationsUnbuffer() throws Exception { public void testOverlappingRanges() throws Exception { FileSystem fs = getFileSystem(); List fileRanges = getSampleOverlappingRanges(); - testExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); + verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); } /** @@ -154,6 +154,6 @@ public void testSameRanges() throws Exception { // Same ranges are special case of overlapping only. FileSystem fs = getFileSystem(); List fileRanges = getSampleSameRanges(); - testExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); + verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); } }