diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java index 59345f5d25caf..7f3171235c8f4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,6 +22,9 @@ import java.io.FileDescriptor; import java.io.IOException; import java.util.StringJoiner; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -158,8 +161,24 @@ public IOStatistics getIOStatistics() { @Override public String toString() { return new StringJoiner(", ", - BufferedFSInputStream.class.getSimpleName() + "[", "]") - .add("in=" + in) - .toString(); + BufferedFSInputStream.class.getSimpleName() + "[", "]") + .add("in=" + in) + .toString(); + } + + @Override + public int minSeekForVectorReads() { + return ((PositionedReadable) in).minSeekForVectorReads(); + } + + @Override + public int maxReadSizeForVectorReads() { + return ((PositionedReadable) in).maxReadSizeForVectorReads(); + } + + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + ((PositionedReadable) in).readVectored(ranges, allocate); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index c7f8e36c3f675..e525daef5c8bb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,18 +22,26 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.IntFunction; +import java.util.zip.CRC32; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl; +import org.apache.hadoop.fs.impl.VectoredReadUtils; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; import org.apache.hadoop.fs.impl.OpenFileParameters; import org.apache.hadoop.fs.permission.AclEntry; @@ -66,7 +74,7 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { public static double getApproxChkSumLength(long size) { return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size; } - + public ChecksumFileSystem(FileSystem fs) { super(fs); } @@ -82,7 +90,7 @@ public void setConf(Configuration conf) { bytesPerChecksum); } } - + /** * Set whether to verify checksum. */ @@ -95,7 +103,7 @@ public void setVerifyChecksum(boolean verifyChecksum) { public void setWriteChecksum(boolean writeChecksum) { this.writeChecksum = writeChecksum; } - + /** get the raw file system */ @Override public FileSystem getRawFileSystem() { @@ -113,7 +121,7 @@ public static boolean isChecksumFile(Path file) { return name.startsWith(".") && name.endsWith(".crc"); } - /** Return the length of the checksum file given the size of the + /** Return the length of the checksum file given the size of the * actual file. **/ public long getChecksumFileLength(Path file, long fileSize) { @@ -143,18 +151,18 @@ private static class ChecksumFSInputChecker extends FSInputChecker implements private ChecksumFileSystem fs; private FSDataInputStream datas; private FSDataInputStream sums; - + private static final int HEADER_LENGTH = 8; - + private int bytesPerSum = 1; - + public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file) throws IOException { this(fs, file, fs.getConf().getInt( - LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, + LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY, LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT)); } - + public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) throws IOException { super( file, fs.getFileStatus(file).getReplication() ); @@ -170,7 +178,8 @@ public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) if (!Arrays.equals(version, CHECKSUM_VERSION)) throw new IOException("Not a checksum file: "+sumFile); this.bytesPerSum = sums.readInt(); - set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, 4); + set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum, + FSInputChecker.CHECKSUM_SIZE); } catch (IOException e) { // mincing the message is terrible, but java throws permission // exceptions as FNF because that's all the method signatures allow! @@ -182,21 +191,21 @@ public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize) set(fs.verifyChecksum, null, 1, 0); } } - + private long getChecksumFilePos( long dataPos ) { - return HEADER_LENGTH + 4*(dataPos/bytesPerSum); + return HEADER_LENGTH + FSInputChecker.CHECKSUM_SIZE*(dataPos/bytesPerSum); } - + @Override protected long getChunkPosition( long dataPos ) { return dataPos/bytesPerSum*bytesPerSum; } - + @Override public int available() throws IOException { return datas.available() + super.available(); } - + @Override public int read(long position, byte[] b, int off, int len) throws IOException { @@ -214,7 +223,7 @@ public int read(long position, byte[] b, int off, int len) } return nread; } - + @Override public void close() throws IOException { datas.close(); @@ -223,7 +232,7 @@ public void close() throws IOException { } set(fs.verifyChecksum, null, 1, 0); } - + @Override public boolean seekToNewSource(long targetPos) throws IOException { @@ -246,7 +255,7 @@ protected int readChunk(long pos, byte[] buf, int offset, int len, final int checksumsToRead = Math.min( len/bytesPerSum, // number of checksums based on len to read checksum.length / CHECKSUM_SIZE); // size of checksum buffer - long checksumPos = getChecksumFilePos(pos); + long checksumPos = getChecksumFilePos(pos); if(checksumPos != sums.getPos()) { sums.seek(checksumPos); } @@ -286,8 +295,129 @@ protected int readChunk(long pos, byte[] buf, int offset, int len, public IOStatistics getIOStatistics() { return IOStatisticsSupport.retrieveIOStatistics(datas); } + + public static long findChecksumOffset(long dataOffset, + int bytesPerSum) { + return HEADER_LENGTH + (dataOffset/bytesPerSum) * FSInputChecker.CHECKSUM_SIZE; + } + + /** + * Find the checksum ranges that correspond to the given data ranges. + * @param dataRanges the input data ranges, which are assumed to be sorted + * and non-overlapping + * @return a list of AsyncReaderUtils.CombinedFileRange that correspond to + * the checksum ranges + */ + public static List findChecksumRanges( + List dataRanges, + int bytesPerSum, + int minSeek, + int maxSize) { + List result = new ArrayList<>(); + CombinedFileRange currentCrc = null; + for(FileRange range: dataRanges) { + long crcOffset = findChecksumOffset(range.getOffset(), bytesPerSum); + long crcEnd = findChecksumOffset(range.getOffset() + range.getLength() + + bytesPerSum - 1, bytesPerSum); + if (currentCrc == null || + !currentCrc.merge(crcOffset, crcEnd, range, minSeek, maxSize)) { + currentCrc = new CombinedFileRange(crcOffset, crcEnd, range); + result.add(currentCrc); + } + } + return result; + } + + /** + * Check the data against the checksums. + * @param sumsBytes the checksum data + * @param sumsOffset where from the checksum file this buffer started + * @param data the file data + * @param dataOffset where the file data started (must be a multiple of + * bytesPerSum) + * @param bytesPerSum how many bytes per a checksum + * @param file the path of the filename + * @return the data buffer + * @throws CompletionException if the checksums don't match + */ + static ByteBuffer checkBytes(ByteBuffer sumsBytes, + long sumsOffset, + ByteBuffer data, + long dataOffset, + int bytesPerSum, + Path file) { + // determine how many bytes we need to skip at the start of the sums + int offset = + (int) (findChecksumOffset(dataOffset, bytesPerSum) - sumsOffset); + IntBuffer sums = sumsBytes.asIntBuffer(); + sums.position(offset / FSInputChecker.CHECKSUM_SIZE); + ByteBuffer current = data.duplicate(); + int NUM_CHUNKS = data.remaining() / bytesPerSum; + CRC32 crc = new CRC32(); + // check each chunk to ensure they match + for(int c = 0; c < NUM_CHUNKS; ++c) { + // set the buffer position and the limit + current.limit((c + 1) * bytesPerSum); + current.position(c * bytesPerSum); + // compute the crc + crc.reset(); + crc.update(current); + int expected = sums.get(); + int calculated = (int) crc.getValue(); + + if (calculated != expected) { + // cast of c added to silence findbugs + long errPosn = dataOffset + (long) c * bytesPerSum; + throw new CompletionException(new ChecksumException( + "Checksum error: " + file + " at " + errPosn + + " exp: " + expected + " got: " + calculated, errPosn)); + } + } + // if everything matches, we return the data + return data; + } + + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + // If the stream doesn't have checksums, just delegate. + VectoredReadUtils.validateVectoredReadRanges(ranges); + if (sums == null) { + datas.readVectored(ranges, allocate); + return; + } + int minSeek = minSeekForVectorReads(); + int maxSize = maxReadSizeForVectorReads(); + List dataRanges = + VectoredReadUtils.sortAndMergeRanges(ranges, bytesPerSum, + minSeek, maxReadSizeForVectorReads()); + List checksumRanges = findChecksumRanges(dataRanges, + bytesPerSum, minSeek, maxSize); + sums.readVectored(checksumRanges, allocate); + datas.readVectored(dataRanges, allocate); + // Data read is correct. I have verified content of dataRanges. + // There is some bug below here as test (testVectoredReadMultipleRanges) + // is failing, should be + // somewhere while slicing the merged data into smaller user ranges. + // Spend some time figuring out but it is a complex code. + for(CombinedFileRange checksumRange: checksumRanges) { + for(FileRange dataRange: checksumRange.getUnderlying()) { + // when we have both the ranges, validate the checksum + CompletableFuture result = + checksumRange.getData().thenCombineAsync(dataRange.getData(), + (sumBuffer, dataBuffer) -> + checkBytes(sumBuffer, checksumRange.getOffset(), + dataBuffer, dataRange.getOffset(), bytesPerSum, file)); + // Now, slice the read data range to the user's ranges + for(FileRange original: ((CombinedFileRange) dataRange).getUnderlying()) { + original.setData(result.thenApply( + (b) -> VectoredReadUtils.sliceTo(b, dataRange.getOffset(), original))); + } + } + } + } } - + private static class FSDataBoundedInputStream extends FSDataInputStream { private FileSystem fs; private Path file; @@ -298,12 +428,12 @@ private static class FSDataBoundedInputStream extends FSDataInputStream { this.fs = fs; this.file = file; } - + @Override public boolean markSupported() { return false; } - + /* Return the file length */ private long getFileLength() throws IOException { if( fileLen==-1L ) { @@ -311,7 +441,7 @@ private long getFileLength() throws IOException { } return fileLen; } - + /** * Skips over and discards n bytes of data from the * input stream. @@ -335,11 +465,11 @@ public synchronized long skip(long n) throws IOException { } return super.skip(n); } - + /** * Seek to the given position in the stream. * The next read() will be from that position. - * + * *

This method does not allow seek past the end of the file. * This produces IOException. * @@ -404,22 +534,22 @@ public void concat(final Path f, final Path[] psrcs) throws IOException { */ public static long getChecksumLength(long size, int bytesPerSum) { //the checksum length is equal to size passed divided by bytesPerSum + - //bytes written in the beginning of the checksum file. - return ((size + bytesPerSum - 1) / bytesPerSum) * 4 + - CHECKSUM_VERSION.length + 4; + //bytes written in the beginning of the checksum file. + return ((size + bytesPerSum - 1) / bytesPerSum) * FSInputChecker.CHECKSUM_SIZE + + ChecksumFSInputChecker.HEADER_LENGTH; } /** This class provides an output stream for a checksummed file. * It generates checksums for data. */ private static class ChecksumFSOutputSummer extends FSOutputSummer implements IOStatisticsSource, StreamCapabilities { - private FSDataOutputStream datas; + private FSDataOutputStream datas; private FSDataOutputStream sums; private static final float CHKSUM_AS_FRACTION = 0.01f; private boolean isClosed = false; - - public ChecksumFSOutputSummer(ChecksumFileSystem fs, - Path file, + + public ChecksumFSOutputSummer(ChecksumFileSystem fs, + Path file, boolean overwrite, int bufferSize, short replication, @@ -440,7 +570,7 @@ public ChecksumFSOutputSummer(ChecksumFileSystem fs, sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length); sums.writeInt(bytesPerSum); } - + @Override public void close() throws IOException { try { @@ -451,7 +581,7 @@ public void close() throws IOException { isClosed = true; } } - + @Override protected void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) @@ -707,7 +837,7 @@ public boolean rename(Path src, Path dst) throws IOException { value = fs.rename(srcCheckFile, dstCheckFile); } else if (fs.exists(dstCheckFile)) { // no src checksum, so remove dst checksum - value = fs.delete(dstCheckFile, true); + value = fs.delete(dstCheckFile, true); } return value; @@ -739,7 +869,7 @@ public boolean delete(Path f, boolean recursive) throws IOException{ return fs.delete(f, true); } } - + final private static PathFilter DEFAULT_FILTER = new PathFilter() { @Override public boolean accept(Path file) { @@ -750,7 +880,7 @@ public boolean accept(Path file) { /** * List the statuses of the files/directories in the given path if the path is * a directory. - * + * * @param f * given path * @return the statuses of the files/directories in the given path @@ -771,7 +901,7 @@ public RemoteIterator listStatusIterator(final Path p) /** * List the statuses of the files/directories in the given path if the path is * a directory. - * + * * @param f * given path * @return the statuses of the files/directories in the given patch @@ -782,7 +912,7 @@ public RemoteIterator listLocatedStatus(Path f) throws IOException { return fs.listLocatedStatus(f, DEFAULT_FILTER); } - + @Override public boolean mkdirs(Path f) throws IOException { return fs.mkdirs(f); @@ -832,7 +962,7 @@ public void copyToLocalFile(Path src, Path dst, boolean copyCrc) } else { FileStatus[] srcs = listStatus(src); for (FileStatus srcFile : srcs) { - copyToLocalFile(srcFile.getPath(), + copyToLocalFile(srcFile.getPath(), new Path(dst, srcFile.getPath().getName()), copyCrc); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index b143a4cb63d19..52644402ca459 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -26,6 +26,8 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.EnumSet; +import java.util.List; +import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -51,7 +53,7 @@ public class FSDataInputStream extends DataInputStream */ private final IdentityHashStore extendedReadBuffers - = new IdentityHashStore(0); + = new IdentityHashStore<>(0); public FSDataInputStream(InputStream in) { super(in); @@ -279,4 +281,20 @@ public void readFully(long position, ByteBuffer buf) throws IOException { public IOStatistics getIOStatistics() { return IOStatisticsSupport.retrieveIOStatistics(in); } + + @Override + public int minSeekForVectorReads() { + return ((PositionedReadable) in).minSeekForVectorReads(); + } + + @Override + public int maxReadSizeForVectorReads() { + return ((PositionedReadable) in).maxReadSizeForVectorReads(); + } + + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + ((PositionedReadable) in).readVectored(ranges, allocate); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java new file mode 100644 index 0000000000000..416f419ad7830 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +/** + * A byte range of a file. + * This is used for the asynchronous gather read API of + * {@link PositionedReadable#readVectored}. + */ +public interface FileRange { + /** + * Get the starting offset of the range + * @return the byte offset of the start + */ + long getOffset(); + + /** + * Get the length of the range. + * @return the number of bytes in the range. + */ + int getLength(); + + /** + * Get the future data for this range. + * @return the future for the {@link ByteBuffer} that contains the data + */ + CompletableFuture getData(); + + /** + * Set a future for this range's data. + * This method is called by {@link PositionedReadable#readVectored} to store the + * data for the user to pick up later via {@link #getData}. + * @param data the future of the ByteBuffer that will have the data + */ + void setData(CompletableFuture data); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java new file mode 100644 index 0000000000000..a844dfc574442 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRangeImpl.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +/** + * A range of bytes from a file with an optional buffer to read those bytes + * for zero copy. + */ +public class FileRangeImpl implements FileRange { + protected long offset; + protected int length; + private CompletableFuture reader; + + public FileRangeImpl(long offset, int length) { + this.offset = offset; + this.length = length; + } + + @Override + public String toString() { + return "range[" + offset + "," + (offset + length) + ")"; + } + + @Override + public long getOffset() { + return offset; + } + + @Override + public int getLength() { + return length; + } + + @Override + public void setData(CompletableFuture reader) { + this.reader = reader; + } + + @Override + public CompletableFuture getData() { + return reader; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index 6744d17a72666..7e543ebf22669 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,10 +17,15 @@ */ package org.apache.hadoop.fs; -import java.io.*; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.impl.VectoredReadUtils; /** * Stream that permits positional reading. @@ -85,4 +90,38 @@ void readFully(long position, byte[] buffer, int offset, int length) * the read operation completed */ void readFully(long position, byte[] buffer) throws IOException; + + /** + * What is the smallest reasonable seek? + * @return the minimum number of bytes + */ + default int minSeekForVectorReads() { + return 4 * 1024; + } + + /** + * What is the largest size that we should group ranges together as? + * @return the number of bytes to read at once + */ + default int maxReadSizeForVectorReads() { + return 1024 * 1024; + } + + /** + * Read fully a list of file ranges asynchronously from this file. + * The default iterates through the ranges to read each synchronously, but + * the intent is that FSDataInputStream subclasses can make more efficient + * readers. + * As a result of the call, each range will have FileRange.setData(CompletableFuture) + * called with a future that when complete will have a ByteBuffer with the + * data from the file's range. + * @param ranges the byte ranges to read + * @param allocate the function to allocate ByteBuffer + * @throws IOException any IOE. + */ + default void readVectored(List ranges, + IntFunction allocate) throws IOException { + VectoredReadUtils.readVectored(this, ranges, allocate, minSeekForVectorReads(), + maxReadSizeForVectorReads()); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index edcc4a8b99e77..ef0abce4ceac4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,6 +20,7 @@ package org.apache.hadoop.fs; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.impl.VectoredReadUtils; import java.io.BufferedOutputStream; import java.io.DataOutput; @@ -33,8 +34,11 @@ import java.io.FileDescriptor; import java.net.URI; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; import java.nio.file.Files; import java.nio.file.NoSuchFileException; +import java.nio.file.StandardOpenOption; import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributeView; import java.nio.file.attribute.FileTime; @@ -44,6 +48,9 @@ import java.util.Optional; import java.util.StringTokenizer; import java.util.concurrent.atomic.AtomicLong; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -125,7 +132,9 @@ public void initialize(URI uri, Configuration conf) throws IOException { class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor, IOStatisticsSource, StreamCapabilities { private FileInputStream fis; + private final File name; private long position; + private AsynchronousFileChannel asyncChannel = null; /** * Minimal set of counters. @@ -143,7 +152,8 @@ class LocalFSFileInputStream extends FSInputStream implements private final AtomicLong bytesRead; public LocalFSFileInputStream(Path f) throws IOException { - fis = new FileInputStream(pathToFile(f)); + name = pathToFile(f); + fis = new FileInputStream(name); bytesRead = ioStatistics.getCounterReference( STREAM_READ_BYTES); } @@ -174,10 +184,16 @@ public boolean seekToNewSource(long targetPos) throws IOException { @Override public int available() throws IOException { return fis.available(); } @Override - public void close() throws IOException { fis.close(); } - @Override public boolean markSupported() { return false; } - + + @Override + public void close() throws IOException { + fis.close(); + if (asyncChannel != null) { + asyncChannel.close(); + } + } + @Override public int read() throws IOException { try { @@ -267,8 +283,88 @@ public boolean hasCapability(String capability) { public IOStatistics getIOStatistics() { return ioStatistics; } + + AsynchronousFileChannel getAsyncChannel() throws IOException { + if (asyncChannel == null) { + synchronized (this) { + asyncChannel = AsynchronousFileChannel.open(name.toPath(), + StandardOpenOption.READ); + } + } + return asyncChannel; + } + + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + + // Set up all of the futures, so that we can use them if things fail + for(FileRange range: ranges) { + VectoredReadUtils.validateRangeRequest(range); + range.setData(new CompletableFuture<>()); + } + try { + AsynchronousFileChannel channel = getAsyncChannel(); + ByteBuffer[] buffers = new ByteBuffer[ranges.size()]; + AsyncHandler asyncHandler = new AsyncHandler(channel, ranges, buffers); + for(int i = 0; i < ranges.size(); ++i) { + FileRange range = ranges.get(i); + buffers[i] = allocate.apply(range.getLength()); + channel.read(buffers[i], range.getOffset(), i, asyncHandler); + } + } catch (IOException ioe) { + LOG.debug("Exception occurred during vectored read ", ioe); + for(FileRange range: ranges) { + range.getData().completeExceptionally(ioe); + } + } + } } - + + /** + * A CompletionHandler that implements readFully and translates back + * into the form of CompletionHandler that our users expect. + */ + static class AsyncHandler implements CompletionHandler { + private final AsynchronousFileChannel channel; + private final List ranges; + private final ByteBuffer[] buffers; + + AsyncHandler(AsynchronousFileChannel channel, + List ranges, + ByteBuffer[] buffers) { + this.channel = channel; + this.ranges = ranges; + this.buffers = buffers; + } + + @Override + public void completed(Integer result, Integer r) { + FileRange range = ranges.get(r); + ByteBuffer buffer = buffers[r]; + if (result == -1) { + failed(new EOFException("Read past End of File"), r); + } else { + if (buffer.remaining() > 0) { + // issue a read for the rest of the buffer + // QQ: What if this fails? It has the same handler. + channel.read(buffer, range.getOffset() + buffer.position(), r, this); + } else { + // QQ: Why is this required? I think because we don't want the + // user to read data beyond limit. + buffer.flip(); + range.getData().complete(buffer); + } + } + } + + @Override + public void failed(Throwable exc, Integer r) { + LOG.debug("Failed while reading range {} ", r, exc); + ranges.get(r).getData().completeExceptionally(exc); + } + } + @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { getFileStatus(f); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java new file mode 100644 index 0000000000000..05d844a198577 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; + +import java.util.ArrayList; +import java.util.List; + +/** + * A file range that represents a set of underlying file ranges. + * This is used when we combine the user's FileRange objects + * together into a single read for efficiency. + */ +public class CombinedFileRange extends FileRangeImpl { + private ArrayList underlying = new ArrayList<>(); + + public CombinedFileRange(long offset, long end, FileRange original) { + super(offset, (int) (end - offset)); + this.underlying.add(original); + } + + /** + * Get the list of ranges that were merged together to form this one. + * @return the list of input ranges + */ + public List getUnderlying() { + return underlying; + } + + /** + * Merge this input range into the current one, if it is compatible. + * It is assumed that otherOffset is greater or equal the current offset, + * which typically happens by sorting the input ranges on offset. + * @param otherOffset the offset to consider merging + * @param otherEnd the end to consider merging + * @param other the underlying FileRange to add if we merge + * @param minSeek the minimum distance that we'll seek without merging the + * ranges together + * @param maxSize the maximum size that we'll merge into a single range + * @return true if we have merged the range into this one + */ + public boolean merge(long otherOffset, long otherEnd, FileRange other, + int minSeek, int maxSize) { + long end = offset + length; + long newEnd = Math.max(end, otherEnd); + if (otherOffset - end >= minSeek || newEnd - offset > maxSize) { + return false; + } + length = (int) (newEnd - offset); + underlying.add(other); + return true; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java new file mode 100644 index 0000000000000..6d5215f2b1511 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/VectoredReadUtils.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; + +import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.util.Preconditions; + +public class VectoredReadUtils { + + /** + * @param range + * @throws EOFException + */ + public static void validateRangeRequest(FileRange range) + throws EOFException { + + Preconditions.checkArgument(range.getLength() >= 0, "length is negative"); + if (range.getOffset() < 0) { + throw new EOFException("position is negative"); + } + } + + public static void validateVectoredReadRanges(List ranges) + throws EOFException { + for (FileRange range : ranges) { + validateRangeRequest(range); + } + } + + + + /** + * Read fully a list of file ranges asynchronously from this file. + * The default iterates through the ranges to read each synchronously, but + * the intent is that subclasses can make more efficient readers. + * The data or exceptions are pushed into {@link FileRange#getData()}. + * @param stream the stream to read the data from + * @param ranges the byte ranges to read + * @param allocate the byte buffer allocation + * @param minimumSeek the minimum number of bytes to seek over + * @param maximumRead the largest number of bytes to combine into a single read + */ + public static void readVectored(PositionedReadable stream, + List ranges, + IntFunction allocate, + int minimumSeek, + int maximumRead) { + if (isOrderedDisjoint(ranges, 1, minimumSeek)) { + for(FileRange range: ranges) { + range.setData(readRangeFrom(stream, range, allocate)); + } + } else { + for(CombinedFileRange range: sortAndMergeRanges(ranges, 1, minimumSeek, + maximumRead)) { + CompletableFuture read = + readRangeFrom(stream, range, allocate); + for(FileRange child: range.getUnderlying()) { + child.setData(read.thenApply( + (b) -> sliceTo(b, range.getOffset(), child))); + } + } + } + } + + /** + * Synchronously reads a range from the stream dealing with the combinations + * of ByteBuffers buffers and PositionedReadable streams. + * @param stream the stream to read from + * @param range the range to read + * @param allocate the function to allocate ByteBuffers + * @return the CompletableFuture that contains the read data + */ + public static CompletableFuture readRangeFrom(PositionedReadable stream, + FileRange range, + IntFunction allocate) { + CompletableFuture result = new CompletableFuture<>(); + try { + ByteBuffer buffer = allocate.apply(range.getLength()); + if (stream instanceof ByteBufferPositionedReadable) { + ((ByteBufferPositionedReadable) stream).readFully(range.getOffset(), + buffer); + buffer.flip(); + } else { + readNonByteBufferPositionedReadable(stream, range, buffer); + } + result.complete(buffer); + } catch (IOException ioe) { + result.completeExceptionally(ioe); + } + return result; + } + + private static void readNonByteBufferPositionedReadable(PositionedReadable stream, + FileRange range, + ByteBuffer buffer) throws IOException { + if (buffer.isDirect()) { + buffer.put(readInDirectBuffer(stream, range)); + buffer.flip(); + } else { + stream.readFully(range.getOffset(), buffer.array(), + buffer.arrayOffset(), range.getLength()); + } + } + + private static byte[] readInDirectBuffer(PositionedReadable stream, + FileRange range) throws IOException { + // if we need to read data from a direct buffer and the stream doesn't + // support it, we allocate a byte array to use. + byte[] tmp = new byte[range.getLength()]; + stream.readFully(range.getOffset(), tmp, 0, tmp.length); + return tmp; + } + + /** + * Is the given input list: + *

    + *
  • already sorted by offset
  • + *
  • each range is more than minimumSeek apart
  • + *
  • the start and end of each range is a multiple of chunkSize
  • + *
+ * + * @param input the list of input ranges + * @param chunkSize the size of the chunks that the offset & end must align to + * @param minimumSeek the minimum distance between ranges + * @return true if we can use the input list as is + */ + public static boolean isOrderedDisjoint(List input, + int chunkSize, + int minimumSeek) { + long previous = -minimumSeek; + for(FileRange range: input) { + long offset = range.getOffset(); + long end = range.getOffset() + range.getLength(); + if (offset % chunkSize != 0 || + end % chunkSize != 0 || + (offset - previous < minimumSeek)) { + return false; + } + previous = end; + } + return true; + } + + public static long roundDown(long offset, int chunkSize) { + if (chunkSize > 1) { + return offset - (offset % chunkSize); + } else { + return offset; + } + } + + public static long roundUp(long offset, int chunkSize) { + if (chunkSize > 1) { + long next = offset + chunkSize - 1; + return next - (next % chunkSize); + } else { + return offset; + } + } + + /** + * Sort and merge ranges to optimize the access from the underlying file + * system. + * The motivations are that: + *
    + *
  • Upper layers want to pass down logical file ranges.
  • + *
  • Fewer reads have better performance.
  • + *
  • Applications want callbacks as ranges are read.
  • + *
  • Some file systems want to round ranges to be at checksum boundaries.
  • + *
+ * + * @param input the list of input ranges + * @param chunkSize round the start and end points to multiples of chunkSize + * @param minimumSeek the smallest gap that we should seek over in bytes + * @param maxSize the largest combined file range in bytes + * @return the list of sorted CombinedFileRanges that cover the input + */ + public static List sortAndMergeRanges(List input, + int chunkSize, + int minimumSeek, + int maxSize) { + // sort the ranges by offset + FileRange[] ranges = input.toArray(new FileRange[0]); + Arrays.sort(ranges, Comparator.comparingLong(FileRange::getOffset)); + CombinedFileRange current = null; + List result = new ArrayList<>(ranges.length); + + // now merge together the ones that merge + for(FileRange range: ranges) { + long start = roundDown(range.getOffset(), chunkSize); + long end = roundUp(range.getOffset() + range.getLength(), chunkSize); + if (current == null || !current.merge(start, end, range, minimumSeek, maxSize)) { + current = new CombinedFileRange(start, end, range); + result.add(current); + } + } + return result; + } + + /** + * Slice the data that was read to the user's request. + * This function assumes that the user's request is completely subsumed by the + * read data. + * @param readData the buffer with the readData + * @param readOffset the offset in the file for the readData + * @param request the user's request + * @return the readData buffer that is sliced to the user's request + */ + public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset, + FileRange request) { + int offsetChange = (int) (request.getOffset() - readOffset); + int requestLength = request.getLength(); + // If we need to change the offset or length, make a copy and do it + if (offsetChange != 0 || readData.remaining() != requestLength) { + readData = readData.slice(); + readData.position(offsetChange); + readData.limit(offsetChange + requestLength); + } + return readData; + } + + private VectoredReadUtils() { + throw new UnsupportedOperationException(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index 090696483be34..02d6765bd4c53 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -443,6 +443,37 @@ The semantics of this are exactly equivalent to That is, the buffer is filled entirely with the contents of the input source from position `position` +### `default void readVectored(List ranges, IntFunction allocate)` + +Read fully data for a list of ranges asynchronously. The default implementation +iterates through the ranges, tries to coalesce the ranges based on values of +`minSeekForVectorReads` and `maxReadSizeForVectorReads` and then read each merged +ranges synchronously, but the intent is sub classes can implement efficient +implementation. + +#### Preconditions + +For each requested range: + + range.getOffset >= 0 else raise IllegalArgumentException + range.getLength >= 0 else raise EOFException + +#### Postconditions + +For each requested range: + + range.getData() returns CompletableFuture which will have data + from range.getOffset to range.getLength. + +### `minSeekForVectorReads()` + +Smallest reasonable seek. Two ranges won't be merged together if the difference between +end of first and start of next range is more than this value. + +### `maxReadSizeForVectorReads()` + +Maximum number of bytes which can be read in one go after merging the ranges. +Two ranges won't be merged if the combined data to be read is more than this value. ## Consistency diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java new file mode 100644 index 0000000000000..0485381e06c3b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -0,0 +1,291 @@ +package org.apache.hadoop.fs.contract; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.FutureIOSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.IntFunction; + +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; + +@RunWith(Parameterized.class) +public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class); + + public static final int DATASET_LEN = 1024; + private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); + private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; + private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt"; + private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256); + + + private final IntFunction allocate; + + private final String bufferType; + + @Parameterized.Parameters + public static List params() { + return Arrays.asList("direct", "array"); + } + + public AbstractContractVectoredReadTest(String bufferType) { + this.bufferType = bufferType; + this.allocate = "array".equals(bufferType) ? + ByteBuffer::allocate : ByteBuffer::allocateDirect; + } + + @Override + public void setup() throws Exception { + super.setup(); + Path path = path(VECTORED_READ_FILE_NAME); + FileSystem fs = getFileSystem(); + createFile(fs, path, true, DATASET); + Path bigFile = path(VECTORED_READ_FILE_1MB_NAME); + createFile(fs, bigFile, true, DATASET_MB); + } + + /** + * TODO: Improve coverage here by adding : + * 1. Some disjoint along with combinable ranges. + * 2. Non overlapping ranges. + * 3. Overlapping + Non overlapping ranges. + * 4. Limit buffer allocation. + * 5. Trying to exhaust thread pool -> maybe scale tests. + * 6. testNormalReadAfterVectoredRead + * 7. testVectoredReadAfterNormalRead + * 8. testMultipleVectoredReads + * 9. test allocate capacity limit errors. + * + * @throws Exception + */ + @Test + public void testVectoredReadMultipleRanges() throws Exception { + describe("Running with buffer type : " + bufferType); + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + FileRange fileRange = new FileRangeImpl(i * 100, 100); + fileRanges.add(fileRange); + } + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + CompletableFuture[] completableFutures = new CompletableFuture[fileRanges.size()]; + int i = 0; + for (FileRange res : fileRanges) { + completableFutures[i++] = res.getData(); + } + CompletableFuture combinedFuture = CompletableFuture.allOf(completableFutures); + combinedFuture.get(); + + validateVectoredReadResult(fileRanges); + } + } + + @Test + public void testVectoredReadAndReadFully() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(100, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + byte[] readFullRes = new byte[100]; + in.readFully(100, readFullRes); + ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData()); + Assertions.assertThat(vecRes) + .describedAs("Result from vectored read and readFully must match") + .isEqualByComparingTo(ByteBuffer.wrap(readFullRes)); + } + } + + + @Test + public void testVectoredReadBigFile() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(1293, 25837)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_1MB_NAME))) { + in.readVectored(fileRanges, allocate); + ByteBuffer vecRes = FutureIOSupport.awaitFuture(fileRanges.get(0).getData()); + FileRange resRange = fileRanges.get(0); + assertDatasetEquals((int) resRange.getOffset(), "vecRead", + vecRes, resRange.getLength(), DATASET_MB); + } + } + + @Test + public void testOverlappingRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(0, 100)); + fileRanges.add(new FileRangeImpl(90, 50)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges); + } + } + + @Test + public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + for (FileRange res : fileRanges) { + CompletableFuture data = res.getData(); + try { + ByteBuffer buffer = data.get(); + // Shouldn't reach here. + Assert.fail("EOFException must be thrown while reading EOF"); + } catch (ExecutionException ex) { + // ignore as expected. + } catch (Exception ex) { + LOG.error("Exception while running vectored read ", ex); + Assert.fail("Exception while running vectored read " + ex); + } + } + } + } + + @Test + public void testNegativeLengthRange() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(0, -50)); + testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); + } + + @Test + public void testNegativeOffsetRange() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(-1, 50)); + testExceptionalVectoredRead(fs, fileRanges, "Exception is expected"); + } + + @Test + public void testNormalReadAfterVectoredRead() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = createSomeOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges, allocate); + // read starting 200 bytes + byte[] res = new byte[200]; + in.read(res, 0, 200); + ByteBuffer buffer = ByteBuffer.wrap(res); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + Assertions.assertThat(in.getPos()) + .describedAs("Vectored read shouldn't change file pointer.") + .isEqualTo(200); + validateVectoredReadResult(fileRanges); + } + } + + @Test + public void testVectoredReadAfterNormalRead() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = createSomeOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + // read starting 200 bytes + byte[] res = new byte[200]; + in.read(res, 0, 200); + ByteBuffer buffer = ByteBuffer.wrap(res); + assertDatasetEquals(0, "normal_read", buffer, 200, DATASET); + Assertions.assertThat(in.getPos()) + .describedAs("Vectored read shouldn't change file pointer.") + .isEqualTo(200); + in.readVectored(fileRanges, allocate); + validateVectoredReadResult(fileRanges); + } + } + + @Test + public void testMultipleVectoredReads() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges1 = createSomeOverlappingRanges(); + List fileRanges2 = createSomeOverlappingRanges(); + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + in.readVectored(fileRanges1, allocate); + in.readVectored(fileRanges2, allocate); + validateVectoredReadResult(fileRanges2); + validateVectoredReadResult(fileRanges1); + } + } + + protected List createSomeOverlappingRanges() { + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(0, 100)); + fileRanges.add(new FileRangeImpl(90, 50)); + return fileRanges; + } + protected void validateVectoredReadResult(List fileRanges) { + for (FileRange res : fileRanges) { + CompletableFuture data = res.getData(); + try { + ByteBuffer buffer = FutureIOSupport.awaitFuture(data); + assertDatasetEquals((int) res.getOffset(), "vecRead", buffer, res.getLength(), DATASET); + } catch (Exception ex) { + LOG.error("Exception while running vectored read ", ex); + Assert.fail("Exception while running vectored read " + ex); + } + } + } + + protected void testExceptionalVectoredRead(FileSystem fs, + List fileRanges, + String s) throws IOException { + boolean exRaised = false; + try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) { + // Can we intercept here as done in S3 tests ?? + in.readVectored(fileRanges, ByteBuffer::allocate); + } catch (EOFException | IllegalArgumentException ex) { + // expected. + exRaised = true; + } + Assertions.assertThat(exRaised) + .describedAs(s) + .isTrue(); + } + + /** + * Assert that the data read matches the dataset at the given offset. + * This helps verify that the seek process is moving the read pointer + * to the correct location in the file. + * @param readOffset the offset in the file where the read began. + * @param operation operation name for the assertion. + * @param data data read in. + * @param length length of data to check. + * @param originalData + */ + private void assertDatasetEquals( + final int readOffset, final String operation, + final ByteBuffer data, + int length, byte[] originalData) { + for (int i = 0; i < length; i++) { + int o = readOffset + i; + assertEquals(operation + " with read offset " + readOffset + + ": data[" + i + "] != DATASET[" + o + "]", + originalData[o], data.get()); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSConractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSConractVectoredRead.java new file mode 100644 index 0000000000000..264b6e202f8ce --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSConractVectoredRead.java @@ -0,0 +1,17 @@ +package org.apache.hadoop.fs.contract.localfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestLocalFSConractVectoredRead extends AbstractContractVectoredReadTest { + + public TestLocalFSConractVectoredRead(String bufferType) { + super(bufferType); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new LocalFSContract(conf); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractVectoredRead.java new file mode 100644 index 0000000000000..b5bd8791c89ed --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawLocalContractVectoredRead.java @@ -0,0 +1,17 @@ +package org.apache.hadoop.fs.contract.rawlocal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +public class TestRawLocalContractVectoredRead extends AbstractContractVectoredReadTest { + + public TestRawLocalContractVectoredRead(String bufferType) { + super(bufferType); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new RawlocalFSContract(conf); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java new file mode 100644 index 0000000000000..01605f9ee53fd --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.PositionedReadable; +import org.junit.Test; + +import org.apache.hadoop.test.HadoopTestBase; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.IntBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; + +import org.assertj.core.api.Assertions; + +import static org.apache.hadoop.test.MoreAsserts.assertFutureFailedExceptionaly; +import static org.apache.hadoop.test.MoreAsserts.assertFutureCompletedSuccessfully; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * Test behavior of {@link VectoredReadUtils}. + */ +public class TestVectoredReadUtils extends HadoopTestBase { + + @Test + public void testSliceTo() { + final int SIZE = 64 * 1024; + ByteBuffer buffer = ByteBuffer.allocate(SIZE); + // fill the buffer with data + IntBuffer intBuffer = buffer.asIntBuffer(); + for(int i=0; i < SIZE / Integer.BYTES; ++i) { + intBuffer.put(i); + } + // ensure we don't make unnecessary slices + ByteBuffer slice = VectoredReadUtils.sliceTo(buffer, 100, + new FileRangeImpl(100, SIZE)); + Assertions.assertThat(buffer) + .describedAs("Slicing on the same offset shouldn't " + + "create a new buffer") + .isEqualTo(slice); + + // try slicing a range + final int OFFSET = 100; + final int SLICE_START = 1024; + final int SLICE_LENGTH = 16 * 1024; + slice = VectoredReadUtils.sliceTo(buffer, OFFSET, + new FileRangeImpl(OFFSET + SLICE_START, SLICE_LENGTH)); + // make sure they aren't the same, but use the same backing data + Assertions.assertThat(buffer) + .describedAs("Slicing on new offset should " + + "create a new buffer") + .isNotEqualTo(slice); + Assertions.assertThat(buffer.array()) + .describedAs("Slicing should use the same underlying " + + "data") + .isEqualTo(slice.array()); + // test the contents of the slice + intBuffer = slice.asIntBuffer(); + for(int i=0; i < SLICE_LENGTH / Integer.BYTES; ++i) { + assertEquals("i = " + i, i + SLICE_START / Integer.BYTES, intBuffer.get()); + } + } + + @Test + public void testRounding() { + for(int i=5; i < 10; ++i) { + assertEquals("i = "+ i, 5, VectoredReadUtils.roundDown(i, 5)); + assertEquals("i = "+ i, 10, VectoredReadUtils.roundUp(i+1, 5)); + } + assertEquals(13, VectoredReadUtils.roundDown(13, 1)); + assertEquals(13, VectoredReadUtils.roundUp(13, 1)); + } + + @Test + public void testMerge() { + FileRange base = new FileRangeImpl(2000, 1000); + CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base); + + // test when the gap between is too big + assertFalse(mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2000, 4000)); + assertEquals(1, mergeBase.getUnderlying().size()); + assertEquals(2000, mergeBase.getOffset()); + assertEquals(1000, mergeBase.getLength()); + + // test when the total size gets exceeded + assertFalse(mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2001, 3999)); + assertEquals(1, mergeBase.getUnderlying().size()); + assertEquals(2000, mergeBase.getOffset()); + assertEquals(1000, mergeBase.getLength()); + + // test when the merge works + assertTrue(mergeBase.merge(5000, 6000, + new FileRangeImpl(5000, 1000), 2001, 4000)); + assertEquals(2, mergeBase.getUnderlying().size()); + assertEquals(2000, mergeBase.getOffset()); + assertEquals(4000, mergeBase.getLength()); + + // reset the mergeBase and test with a 10:1 reduction + mergeBase = new CombinedFileRange(200, 300, base); + assertEquals(200, mergeBase.getOffset()); + assertEquals(100, mergeBase.getLength()); + assertTrue(mergeBase.merge(500, 600, + new FileRangeImpl(5000, 1000), 201, 400)); + assertEquals(2, mergeBase.getUnderlying().size()); + assertEquals(200, mergeBase.getOffset()); + assertEquals(400, mergeBase.getLength()); + } + + @Test + public void testSortAndMerge() { + List input = Arrays.asList( + new FileRangeImpl(3000, 100), + new FileRangeImpl(2100, 100), + new FileRangeImpl(1000, 100) + ); + assertFalse(VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + List outputList = VectoredReadUtils.sortAndMergeRanges( + input, 100, 1001, 2500); + assertEquals(1, outputList.size()); + CombinedFileRange output = outputList.get(0); + assertEquals(3, output.getUnderlying().size()); + assertEquals("range[1000,3100)", output.toString()); + assertTrue(VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); + + // the minSeek doesn't allow the first two to merge + assertFalse(VectoredReadUtils.isOrderedDisjoint(input, 100, 1000)); + outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1000, 2100); + assertEquals(2, outputList.size()); + assertEquals("range[1000,1100)", outputList.get(0).toString()); + assertEquals("range[2100,3100)", outputList.get(1).toString()); + assertTrue(VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000)); + + // the maxSize doesn't allow the third range to merge + assertFalse(VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + outputList = VectoredReadUtils.sortAndMergeRanges(input, 100, 1001, 2099); + assertEquals(2, outputList.size()); + assertEquals("range[1000,2200)", outputList.get(0).toString()); + assertEquals("range[3000,3100)", outputList.get(1).toString()); + assertTrue(VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800)); + + // test the round up and round down (the maxSize doesn't allow any merges) + assertFalse(VectoredReadUtils.isOrderedDisjoint(input, 16, 700)); + outputList = VectoredReadUtils.sortAndMergeRanges(input, 16, 1001, 100); + assertEquals(3, outputList.size()); + assertEquals("range[992,1104)", outputList.get(0).toString()); + assertEquals("range[2096,2208)", outputList.get(1).toString()); + assertEquals("range[2992,3104)", outputList.get(2).toString()); + assertTrue(VectoredReadUtils.isOrderedDisjoint(outputList, 16, 700)); + } + + @Test + public void testSortAndMergeMoreCases() throws Exception { + List input = Arrays.asList( + new FileRangeImpl(3000, 110), + new FileRangeImpl(3000, 100), + new FileRangeImpl(2100, 100), + new FileRangeImpl(1000, 100) + ); + assertFalse(VectoredReadUtils.isOrderedDisjoint(input, 100, 800)); + List outputList = VectoredReadUtils.sortAndMergeRanges( + input, 1, 1001, 2500); + assertEquals(1, outputList.size()); + CombinedFileRange output = outputList.get(0); + assertEquals(4, output.getUnderlying().size()); + assertEquals("range[1000,3110)", output.toString()); + assertTrue(VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); + + outputList = VectoredReadUtils.sortAndMergeRanges( + input, 100, 1001, 2500); + assertEquals(1, outputList.size()); + output = outputList.get(0); + assertEquals(4, output.getUnderlying().size()); + assertEquals("range[1000,3200)", output.toString()); + assertTrue(VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800)); + + } + interface Stream extends PositionedReadable, ByteBufferPositionedReadable { + // nothing + } + + static void fillBuffer(ByteBuffer buffer) { + byte b = 0; + while (buffer.remaining() > 0) { + buffer.put(b++); + } + } + + @Test + public void testReadRangeFromByteBufferPositionedReadable() throws Exception { + Stream stream = Mockito.mock(Stream.class); + Mockito.doAnswer(invocation -> { + fillBuffer(invocation.getArgument(1)); + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + CompletableFuture result = + VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + ByteBuffer::allocate); + assertFutureCompletedSuccessfully(result); + ByteBuffer buffer = result.get(); + assertEquals(100, buffer.remaining()); + byte b = 0; + while (buffer.remaining() > 0) { + assertEquals("remain = " + buffer.remaining(), b++, buffer.get()); + } + + // test an IOException + Mockito.reset(stream); + Mockito.doThrow(new IOException("foo")) + .when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + result = + VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + ByteBuffer::allocate); + assertFutureFailedExceptionaly(result); + } + + static void runReadRangeFromPositionedReadable(IntFunction allocate) throws Exception { + PositionedReadable stream = Mockito.mock(PositionedReadable.class); + Mockito.doAnswer(invocation -> { + byte b=0; + byte[] buffer = invocation.getArgument(1); + for(int i=0; i < buffer.length; ++i) { + buffer[i] = b++; + } + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.anyInt(), + ArgumentMatchers.anyInt()); + CompletableFuture result = + VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + allocate); + assertFutureCompletedSuccessfully(result); + ByteBuffer buffer = result.get(); + assertEquals(100, buffer.remaining()); + byte b = 0; + while (buffer.remaining() > 0) { + assertEquals("remain = " + buffer.remaining(), b++, buffer.get()); + } + + // test an IOException + Mockito.reset(stream); + Mockito.doThrow(new IOException("foo")) + .when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(), ArgumentMatchers.anyInt(), + ArgumentMatchers.anyInt()); + result = + VectoredReadUtils.readRangeFrom(stream, new FileRangeImpl(1000, 100), + ByteBuffer::allocate); + assertFutureFailedExceptionaly(result); + } + + @Test + public void testReadRangeArray() throws Exception { + runReadRangeFromPositionedReadable(ByteBuffer::allocate); + } + + @Test + public void testReadRangeDirect() throws Exception { + runReadRangeFromPositionedReadable(ByteBuffer::allocateDirect); + } + + static void validateBuffer(String message, ByteBuffer buffer, int start) { + byte expected = (byte) start; + while (buffer.remaining() > 0) { + assertEquals(message + " remain: " + buffer.remaining(), expected++, + buffer.get()); + } + } + + @Test + public void testReadAsync() throws Exception { + List input = Arrays.asList(new FileRangeImpl(0, 100), + new FileRangeImpl(100_000, 100), + new FileRangeImpl(200_000, 100)); + Stream stream = Mockito.mock(Stream.class); + Mockito.doAnswer(invocation -> { + fillBuffer(invocation.getArgument(1)); + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + // should not merge the ranges + VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 100, 100); + Mockito.verify(stream, Mockito.times(3)) + .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); + for(int b=0; b < input.size(); ++b) { + validateBuffer("buffer " + b, input.get(b).getData().get(), 0); + } + } + + @Test + public void testReadAsyncMerge() throws Exception { + List input = Arrays.asList(new FileRangeImpl(2000, 100), + new FileRangeImpl(1000, 100), + new FileRangeImpl(0, 100)); + Stream stream = Mockito.mock(Stream.class); + Mockito.doAnswer(invocation -> { + fillBuffer(invocation.getArgument(1)); + return null; + }).when(stream).readFully(ArgumentMatchers.anyLong(), + ArgumentMatchers.any(ByteBuffer.class)); + // should merge the ranges into a single read + VectoredReadUtils.readVectored(stream, input, ByteBuffer::allocate, 1000, 2100); + Mockito.verify(stream, Mockito.times(1)) + .readFully(ArgumentMatchers.anyLong(), ArgumentMatchers.any(ByteBuffer.class)); + for(int b=0; b < input.size(); ++b) { + validateBuffer("buffer " + b, input.get(b).getData().get(), (2 - b) * 1000); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java index 142669b78682e..7dc6ab12a54ea 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java @@ -19,6 +19,9 @@ package org.apache.hadoop.test; import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +import org.assertj.core.api.Assertions; import org.junit.Assert; /** @@ -28,17 +31,18 @@ public class MoreAsserts { /** * Assert equivalence for array and iterable - * @param the type of the elements - * @param s the name/message for the collection - * @param expected the expected array of elements - * @param actual the actual iterable of elements + * + * @param the type of the elements + * @param s the name/message for the collection + * @param expected the expected array of elements + * @param actual the actual iterable of elements */ public static void assertEquals(String s, T[] expected, Iterable actual) { Iterator it = actual.iterator(); int i = 0; for (; i < expected.length && it.hasNext(); ++i) { - Assert.assertEquals("Element "+ i +" for "+ s, expected[i], it.next()); + Assert.assertEquals("Element " + i + " for " + s, expected[i], it.next()); } Assert.assertTrue("Expected more elements", i == expected.length); Assert.assertTrue("Expected less elements", !it.hasNext()); @@ -46,7 +50,8 @@ public static void assertEquals(String s, T[] expected, /** * Assert equality for two iterables - * @param the type of the elements + * + * @param the type of the elements * @param s * @param expected * @param actual @@ -57,10 +62,28 @@ public static void assertEquals(String s, Iterable expected, Iterator ita = actual.iterator(); int i = 0; while (ite.hasNext() && ita.hasNext()) { - Assert.assertEquals("Element "+ i +" for "+s, ite.next(), ita.next()); + Assert.assertEquals("Element " + i + " for " + s, ite.next(), ita.next()); } Assert.assertTrue("Expected more elements", !ite.hasNext()); Assert.assertTrue("Expected less elements", !ita.hasNext()); } + + public static void assertFutureCompletedSuccessfully(CompletableFuture future) { + Assertions.assertThat(future.isDone()) + .describedAs("This future is supposed to be " + + "completed successfully") + .isTrue(); + Assertions.assertThat(future.isCompletedExceptionally()) + .describedAs("This future is supposed to be " + + "completed successfully") + .isFalse(); + } + + public static void assertFutureFailedExceptionaly(CompletableFuture future) { + Assertions.assertThat(future.isCompletedExceptionally()) + .describedAs("This future is supposed to be " + + "completed exceptionally") + .isTrue(); + } } diff --git a/hadoop-common-project/pom.xml b/hadoop-common-project/pom.xml index b36dbf30610ff..f167a079a9b0c 100644 --- a/hadoop-common-project/pom.xml +++ b/hadoop-common-project/pom.xml @@ -56,5 +56,4 @@ - diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index ca6886641fdb2..725899ef69ef3 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -212,6 +212,7 @@ 2.4.7 9.8.1 1.10.11 + 1.20 @@ -1545,6 +1546,16 @@ + + org.openjdk.jmh + jmh-core + ${jmh.version} + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + org.apache.curator curator-test diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 115abe302f61f..35ce29483f407 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1499,7 +1499,8 @@ private FSDataInputStream open( new S3AInputStream( readContext, createObjectAttributes(fileStatus), - createInputStreamCallbacks(auditSpan))); + createInputStreamCallbacks(auditSpan), + unboundedThreadPool)); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 3a0b669543edf..37de3a90eaaeb 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -24,7 +24,12 @@ import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.impl.CombinedFileRange; +import org.apache.hadoop.fs.impl.FutureIOSupport; +import org.apache.hadoop.fs.impl.VectoredReadUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.CanSetReadahead; @@ -46,8 +51,17 @@ import java.io.EOFException; import java.io.IOException; import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.IntFunction; import static org.apache.commons.lang3.StringUtils.isNotEmpty; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.isOrderedDisjoint; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.sliceTo; +import static org.apache.hadoop.fs.impl.VectoredReadUtils.sortAndMergeRanges; import static org.apache.hadoop.util.StringUtils.toLowerCase; /** @@ -100,6 +114,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, private S3ObjectInputStream wrappedStream; private final S3AReadOpContext context; private final InputStreamCallbacks client; + private final ThreadPoolExecutor unboundedThreadPool; private final String bucket; private final String key; private final String pathStr; @@ -145,8 +160,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, * @param client S3 client to use */ public S3AInputStream(S3AReadOpContext ctx, - S3ObjectAttributes s3Attributes, - InputStreamCallbacks client) { + S3ObjectAttributes s3Attributes, + InputStreamCallbacks client, + ThreadPoolExecutor unboundedThreadPool) { Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), "No Bucket"); Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); @@ -168,6 +184,7 @@ public S3AInputStream(S3AReadOpContext ctx, s3Attributes); setInputPolicy(ctx.getInputPolicy()); setReadahead(ctx.getReadahead()); + this.unboundedThreadPool = unboundedThreadPool; } /** @@ -793,6 +810,208 @@ public void readFully(long position, byte[] buffer, int offset, int length) } } + /** + * {@inheritDoc} + * Vectored read implementation for S3AInputStream. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + // TODO Cancelling of vectored read calls. + // No upfront cancelling is supported right now but all runnable + // tasks will be aborted when threadpool will shutdown during S3AFS.close(); + // TODO unbounded corner cases like starvation etc. + // think of creating a separate thread pool + // for vectored read api such that its bad usage doesn't cause + // starvation for other api's like list, delete etc. + // TODO: what if combined range becomes so big that memory can't be allocated. + checkNotClosed(); + for (FileRange range : ranges) { + validateRangeRequest(range); + CompletableFuture result = new CompletableFuture<>(); + range.setData(result); + } + + if (isOrderedDisjoint(ranges, 1, minSeekForVectorReads())) { + for(FileRange range: ranges) { + ByteBuffer buffer = allocate.apply(range.getLength()); + unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); + } + } else { + for(CombinedFileRange combinedFileRange: sortAndMergeRanges(ranges, 1, minSeekForVectorReads(), + maxReadSizeForVectorReads())) { + CompletableFuture result = new CompletableFuture<>(); + ByteBuffer buffer = allocate.apply(combinedFileRange.getLength()); + combinedFileRange.setData(result); + unboundedThreadPool.submit( + () -> readCombinedRangeAndUpdateChildren(combinedFileRange, buffer)); + } + } + } + + /** + * Read data in the combinedFileRange and update data in buffers + * of all underlying ranges. + * @param combinedFileRange combined range. + * @param buffer combined buffer. + */ + private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, + ByteBuffer buffer) { + readSingleRange(combinedFileRange, buffer); + try { + ByteBuffer combinedBuffer = FutureIOSupport.awaitFuture(combinedFileRange.getData()); + for(FileRange child : combinedFileRange.getUnderlying()) { + updateOriginalRange(child, combinedBuffer, combinedFileRange); + } + } catch (IOException ex) { + LOG.error("Exception occurred while reading combined range ", ex); + for(FileRange child : combinedFileRange.getUnderlying()) { + child.getData().completeExceptionally(ex); + } + } + } + + /** + * Update data in child range from combined range. + * @param child child range. + * @param combinedBuffer combined buffer. + * @param combinedFileRange combined range. + */ + private void updateOriginalRange(FileRange child, + ByteBuffer combinedBuffer, + CombinedFileRange combinedFileRange) { + LOG.trace("Start Filling original range [{}, {}) from combined range [{}, {}) ", + child.getOffset(), child.getLength(), combinedFileRange.getOffset(), combinedFileRange.getLength()); + ByteBuffer childBuffer = sliceTo(combinedBuffer, combinedFileRange.getOffset(), child); + child.getData().complete(childBuffer); + LOG.trace("End Filling original range [{}, {}) from combined range [{}, {}) ", + child.getOffset(), child.getLength(), combinedFileRange.getOffset(), combinedFileRange.getLength()); + } + + /** + * Validates range parameters. + * @param range requested range. + * @throws EOFException end of file exception. + */ + private void validateRangeRequest(FileRange range) throws EOFException { + VectoredReadUtils.validateRangeRequest(range); + if(range.getOffset() + range.getLength() > contentLength) { + LOG.error("Requested range [{}, {}) is beyond EOF", + range.getOffset(), range.getLength()); + throw new EOFException("Requested range [" + range.getOffset() +", " + + range.getLength() + ") is beyond EOF"); + } + } + + /** + * TODO: Add retry in client.getObject(). not present in older reads why here?? + * Okay retry is being done in the top layer during read. + * But if we do here in the top layer, one issue I am thinking is + * what if there is some error which happened during filling the buffer + * If we retry that old offsets of heap buffers can be overwritten ? + * I think retry should be only added in {@link S3AInputStream#getS3Object} + * Read data from S3 for this range and populate the bufffer. + * @param range range of data to read. + * @param buffer buffer to fill. + */ + private void readSingleRange(FileRange range, ByteBuffer buffer) { + LOG.debug("Start reading range {} ", range); + S3Object objectRange = null; + S3ObjectInputStream objectContent = null; + try { + long position = range.getOffset(); + int length = range.getLength(); + final String operationName = "readVectored"; + objectRange = getS3Object(operationName, position, length); + objectContent = objectRange.getObjectContent(); + if (objectContent == null) { + throw new PathIOException(uri, + "Null IO stream received during " + operationName); + } + populateBuffer(length, buffer, objectContent); + range.getData().complete(buffer); + } catch (IOException ex) { + LOG.error("Exception while reading a range {} ", range, ex); + range.getData().completeExceptionally(ex); + } finally { + IOUtils.cleanupWithLogger(LOG, objectRange, objectContent); + } + LOG.debug("Finished reading range {} ", range); + } + + /** + * Populates the buffer with data from objectContent + * till length. Handles both direct and heap byte buffers. + * @param length length of data to populate. + * @param buffer buffer to fill. + * @param objectContent result retrieved from S3 store. + * @throws IOException any IOE. + */ + private void populateBuffer(int length, + ByteBuffer buffer, + S3ObjectInputStream objectContent) throws IOException { + if (buffer.isDirect()) { + byte[] tmp = new byte[length]; + readByteArray(objectContent, tmp, 0, length); + buffer.put(tmp); + buffer.flip(); + } else { + readByteArray(objectContent, buffer.array(), 0, length); + } + } + + public void readByteArray(S3ObjectInputStream objectContent, + byte[] dest, + int offset, + int length) throws IOException { + int readBytes = 0; + while ( readBytes < length) { + int readBytesCurr = objectContent.read(dest, + offset + readBytes, + length - readBytes); + readBytes +=readBytesCurr; + if (readBytesCurr < 0) { + throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); + } + } + } + + /** + * Read data from S3 using a http request. + * This also handles if file has been changed while http call + * is getting executed. If file has been changed RemoteFileChangedException + * is thrown. + * @param operationName name of the operation for which get object on S3 is called. + * @param position position of the object to be read from S3. + * @param length length from position of the object to be read from S3. + * @return S3Object + * @throws IOException exception if any. + */ + private S3Object getS3Object(String operationName, long position, + int length) throws IOException { + final GetObjectRequest request = client.newGetRequest(key) + .withRange(position, position + length - 1); + changeTracker.maybeApplyConstraint(request); + DurationTracker tracker = streamStatistics.initiateGetRequest(); + S3Object objectRange; + Invoker invoker = context.getReadInvoker(); + try { + objectRange = invoker.retry(operationName, uri, true, + () -> client.getObject(request)); + } catch (IOException ex) { + tracker.failed(); + throw ex; + } finally { + tracker.close(); + } + changeTracker.processResponse(objectRange, operationName, + position); + return objectRange; + } + /** * Access the input stream statistics. * This is for internal testing and may be removed without warning. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java new file mode 100644 index 0000000000000..41148100b6203 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +import java.util.ArrayList; +import java.util.List; + +public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { + + public ITestS3AContractVectoredRead(String bufferType) { + super(bufferType); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + /** + * Overriding in S3 vectored read api fails fast in case of EOF + * requested range. + * @throws Exception + */ + @Override + public void testEOFRanges() throws Exception { + FileSystem fs = getFileSystem(); + List fileRanges = new ArrayList<>(); + fileRanges.add(new FileRangeImpl(DATASET_LEN, 100)); + testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected"); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java index db5b5b56851ea..95194f71661a7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java @@ -109,7 +109,8 @@ private S3AInputStream getMockedS3AInputStream() { return new S3AInputStream( s3AReadOpContext, s3ObjectAttributes, - getMockedInputStreamCallback()); + getMockedInputStreamCallback(), + null); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 653c8a75e69ff..bd583abdaddf6 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -52,7 +52,7 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN # for debugging low level S3a operations, uncomment these lines # Log all S3A classes -#log4j.logger.org.apache.hadoop.fs.s3a=DEBUG +log4j.logger.org.apache.hadoop.fs.s3a=DEBUG #log4j.logger.org.apache.hadoop.fs.s3a.S3AUtils=INFO #log4j.logger.org.apache.hadoop.fs.s3a.Listing=INFO # Log S3Guard classes diff --git a/hadoop-tools/hadoop-benchmark/pom.xml b/hadoop-tools/hadoop-benchmark/pom.xml new file mode 100644 index 0000000000000..5682ff96da743 --- /dev/null +++ b/hadoop-tools/hadoop-benchmark/pom.xml @@ -0,0 +1,83 @@ + + + + 4.0.0 + + org.apache.hadoop + hadoop-project + 3.4.0-SNAPSHOT + ../../hadoop-project/pom.xml + + hadoop-benchmark + 3.4.0-SNAPSHOT + jar + + Apache Hadoop Common Benchmark + Apache Hadoop Common Benchmark + + + + org.apache.hadoop + hadoop-common + + + org.openjdk.jmh + jmh-core + + + org.openjdk.jmh + jmh-generator-annprocess + + + + + + + maven-assembly-plugin + + + + org.apache.hadoop.benchmark.VectoredReadBenchmark + + + + src/main/assembly/uber.xml + + + + + make-assembly + package + + single + + + + + + org.codehaus.mojo + findbugs-maven-plugin + + ${basedir}/src/main/findbugs/exclude.xml + + + + + diff --git a/hadoop-tools/hadoop-benchmark/src/main/assembly/uber.xml b/hadoop-tools/hadoop-benchmark/src/main/assembly/uber.xml new file mode 100644 index 0000000000000..014eab951b3cf --- /dev/null +++ b/hadoop-tools/hadoop-benchmark/src/main/assembly/uber.xml @@ -0,0 +1,33 @@ + + + uber + + jar + + false + + + / + true + true + runtime + + + + + metaInf-services + + + diff --git a/hadoop-tools/hadoop-benchmark/src/main/findbugs/exclude.xml b/hadoop-tools/hadoop-benchmark/src/main/findbugs/exclude.xml new file mode 100644 index 0000000000000..f0b00b84b4d45 --- /dev/null +++ b/hadoop-tools/hadoop-benchmark/src/main/findbugs/exclude.xml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java new file mode 100644 index 0000000000000..2abcdd12daece --- /dev/null +++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.benchmark; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.nio.file.FileSystems; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.IntFunction; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileRangeImpl; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class VectoredReadBenchmark { + + static final Path DATA_PATH = getTestDataPath(); + static final String DATA_PATH_PROPERTY = "bench.data"; + static final int READ_SIZE = 64 * 1024; + static final long SEEK_SIZE = 1024L * 1024; + + + static Path getTestDataPath() { + String value = System.getProperty(DATA_PATH_PROPERTY); + return new Path(value == null ? "/tmp/taxi.orc" : value); + } + + @State(Scope.Thread) + public static class FileSystemChoice { + + @Param({"local", "raw"}) + String fileSystemKind; + + Configuration conf; + FileSystem fs; + + @Setup(Level.Trial) + public void setup() { + conf = new Configuration(); + try { + LocalFileSystem local = FileSystem.getLocal(conf); + fs = "raw".equals(fileSystemKind) ? local.getRaw() : local; + } catch (IOException e) { + throw new IllegalArgumentException("Can't get filesystem", e); + } + } + } + + @State(Scope.Thread) + public static class BufferChoice { + @Param({"direct", "array"}) + String bufferKind; + + IntFunction allocate; + @Setup(Level.Trial) + public void setup() { + allocate = "array".equals(bufferKind) + ? ByteBuffer::allocate : ByteBuffer::allocateDirect; + } + } + + @Benchmark + public void asyncRead(FileSystemChoice fsChoice, + BufferChoice bufferChoice, + Blackhole blackhole) throws Exception { + FSDataInputStream stream = fsChoice.fs.open(DATA_PATH); + List ranges = new ArrayList<>(); + for(int m=0; m < 100; ++m) { + FileRangeImpl range = new FileRangeImpl(m * SEEK_SIZE, READ_SIZE); + ranges.add(range); + } + stream.readVectored(ranges, bufferChoice.allocate); + for(FileRange range: ranges) { + blackhole.consume(range.getData().get()); + } + stream.close(); + } + + static class Joiner implements CompletionHandler { + private int remaining; + private final ByteBuffer[] result; + private Throwable exception = null; + + Joiner(int total) { + remaining = total; + result = new ByteBuffer[total]; + } + + synchronized void finish() { + remaining -= 1; + if (remaining == 0) { + notify(); + } + } + + synchronized ByteBuffer[] join() throws InterruptedException, IOException { + while (remaining > 0 && exception == null) { + wait(); + } + if (exception != null) { + throw new IOException("problem reading", exception); + } + return result; + } + + + @Override + public synchronized void completed(ByteBuffer buffer, FileRange attachment) { + result[--remaining] = buffer; + if (remaining == 0) { + notify(); + } + } + + @Override + public synchronized void failed(Throwable exc, FileRange attachment) { + this.exception = exc; + notify(); + } + } + + static class FileRangeCallback extends FileRangeImpl implements CompletionHandler { + private final AsynchronousFileChannel channel; + private final ByteBuffer buffer; + private int completed = 0; + private final Joiner joiner; + + FileRangeCallback(AsynchronousFileChannel channel, long offset, + int length, Joiner joiner, ByteBuffer buffer) { + super(offset, length); + this.channel = channel; + this.joiner = joiner; + this.buffer = buffer; + } + + @Override + public void completed(Integer result, FileRangeCallback attachment) { + final int bytes = result; + if (bytes == -1) { + failed(new EOFException("Read past end of file"), this); + } + completed += bytes; + if (completed < length) { + channel.read(buffer, offset + completed, this, this); + } else { + buffer.flip(); + joiner.finish(); + } + } + + @Override + public void failed(Throwable exc, FileRangeCallback attachment) { + joiner.failed(exc, this); + } + } + + @Benchmark + public void asyncFileChanArray(BufferChoice bufferChoice, + Blackhole blackhole) throws Exception { + java.nio.file.Path path = FileSystems.getDefault().getPath(DATA_PATH.toString()); + AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ); + List ranges = new ArrayList<>(); + Joiner joiner = new Joiner(100); + for(int m=0; m < 100; ++m) { + ByteBuffer buffer = bufferChoice.allocate.apply(READ_SIZE); + FileRangeCallback range = new FileRangeCallback(channel, m * SEEK_SIZE, + READ_SIZE, joiner, buffer); + ranges.add(range); + channel.read(buffer, range.getOffset(), range, range); + } + joiner.join(); + channel.close(); + blackhole.consume(ranges); + } + + @Benchmark + public void syncRead(FileSystemChoice fsChoice, + Blackhole blackhole) throws Exception { + FSDataInputStream stream = fsChoice.fs.open(DATA_PATH); + List result = new ArrayList<>(); + for(int m=0; m < 100; ++m) { + byte[] buffer = new byte[READ_SIZE]; + stream.readFully(m * SEEK_SIZE, buffer); + result.add(buffer); + } + blackhole.consume(result); + stream.close(); + } + + /** + * Run the benchmarks. + * @param args the pathname of a 100MB data file + */ + public static void main(String[] args) throws Exception { + OptionsBuilder opts = new OptionsBuilder(); + opts.include("VectoredReadBenchmark"); + opts.jvmArgs("-server", "-Xms256m", "-Xmx2g", + "-D" + DATA_PATH_PROPERTY + "=" + args[0]); + opts.forks(1); + new Runner(opts.build()).run(); + } +} diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml index f026bc261e00b..4e934cd101f85 100644 --- a/hadoop-tools/pom.xml +++ b/hadoop-tools/pom.xml @@ -51,6 +51,7 @@ hadoop-azure-datalake hadoop-aliyun hadoop-fs2img + hadoop-benchmark diff --git a/pom.xml b/pom.xml index 5bdf9fd71868f..281c4f09fe0b2 100644 --- a/pom.xml +++ b/pom.xml @@ -538,6 +538,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/x licenses-binary/** dev-support/docker/pkg-resolver/packages.json dev-support/docker/pkg-resolver/platforms.json + **/target/**