From 03a8c8b9bce1358ed43db0b4e4e042cb55fb4b1c Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Mon, 13 Mar 2023 14:40:40 +0530 Subject: [PATCH 01/17] BASE COMMIT --- .../org/apache/hadoop/fs/s3a/Constants.java | 4 ++ .../hadoop/fs/s3a/S3ABlockOutputStream.java | 28 ++++++----- .../apache/hadoop/fs/s3a/S3ADataBlocks.java | 49 ++++++++++--------- .../hadoop/fs/s3a/S3AInstrumentation.java | 8 +-- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 13 +++++ .../hadoop/fs/s3a/WriteOperationHelper.java | 2 +- .../apache/hadoop/fs/s3a/WriteOperations.java | 2 +- .../hadoop/fs/s3a/api/RequestFactory.java | 2 +- .../fs/s3a/impl/RequestFactoryImpl.java | 2 +- .../BlockOutputStreamStatistics.java | 8 +-- .../impl/EmptyS3AStatisticsContext.java | 8 +-- 11 files changed, 73 insertions(+), 53 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index e5b0a9b5aa163..b71b319462b6e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1255,4 +1255,8 @@ private Constants() { */ public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count"; public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8; + + public static final String ALLOW_MULTIPART_UPLOADS = "fs.s3a.allow.multipart.uploads"; + + public static final boolean IS_ALLOWED_MULTIPART_UPLOADS_DEFAULT = true; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 19943ff2f70da..2dbff80730daa 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements private final String key; /** Size of all blocks. */ - private final int blockSize; + private final long blockSize; /** IO Statistics. */ private final IOStatistics iostatistics; @@ -318,7 +318,7 @@ public synchronized void write(byte[] source, int offset, int len) statistics.writeBytes(len); S3ADataBlocks.DataBlock block = createBlockIfNeeded(); int written = block.write(source, offset, len); - int remainingCapacity = block.remainingCapacity(); + int remainingCapacity = (int) block.remainingCapacity(); if (written < len) { // not everything was written —the block has run out // of capacity @@ -558,19 +558,21 @@ public String toString() { } /** - * Upload the current block as a single PUT request; if the buffer - * is empty a 0-byte PUT will be invoked, as it is needed to create an - * entry at the far end. - * @throws IOException any problem. - * @return number of bytes uploaded. If thread was interrupted while - * waiting for upload to complete, returns zero with interrupted flag set - * on this thread. + * Upload the current block as a single PUT request; if the buffer is empty a + * 0-byte PUT will be invoked, as it is needed to create an entry at the far + * end. + * + * @return number of bytes uploaded. If thread was interrupted while waiting + * for upload to complete, returns zero with interrupted flag set on this + * thread. + * @throws IOException + * any problem. */ - private int putObject() throws IOException { + private long putObject() throws IOException { LOG.debug("Executing regular upload for {}", writeOperationHelper); final S3ADataBlocks.DataBlock block = getActiveBlock(); - int size = block.dataSize(); + long size = block.dataSize(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); final PutObjectRequest putObjectRequest = uploadData.hasFile() ? writeOperationHelper.createPutObjectRequest( @@ -835,7 +837,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block, Preconditions.checkNotNull(uploadId, "Null uploadId"); maybeRethrowUploadFailure(); partsSubmitted++; - final int size = block.dataSize(); + final long size = block.dataSize(); bytesSubmitted += size; final int currentPartNumber = partETagsFutures.size() + 1; final UploadPartRequest request; @@ -1011,7 +1013,7 @@ public void progressChanged(ProgressEvent progressEvent) { ProgressEventType eventType = progressEvent.getEventType(); long bytesTransferred = progressEvent.getBytesTransferred(); - int size = block.dataSize(); + long size = block.dataSize(); switch (eventType) { case REQUEST_BYTE_TRANSFER_EVENT: diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index 03b5bd96162af..518c2277623ba 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -180,7 +180,7 @@ protected BlockFactory(S3AFileSystem owner) { * @param statistics stats to work with * @return a new block. */ - abstract DataBlock create(long index, int limit, + abstract DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException; @@ -258,7 +258,7 @@ final DestState getState() { * Return the current data size. * @return the size of the data */ - abstract int dataSize(); + abstract long dataSize(); /** * Predicate to verify that the block has the capacity to write @@ -280,7 +280,7 @@ boolean hasData() { * The remaining capacity in the block before it is full. * @return the number of bytes remaining. */ - abstract int remainingCapacity(); + abstract long remainingCapacity(); /** * Write a series of bytes from the buffer, from the offset. @@ -391,7 +391,7 @@ static class ArrayBlockFactory extends BlockFactory { } @Override - DataBlock create(long index, int limit, + DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { return new ByteArrayBlock(0, limit, statistics); @@ -436,11 +436,11 @@ static class ByteArrayBlock extends DataBlock { private Integer dataSize; ByteArrayBlock(long index, - int limit, + long limit, BlockOutputStreamStatistics statistics) { super(index, statistics); - this.limit = limit; - buffer = new S3AByteArrayOutputStream(limit); + this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit; + buffer = new S3AByteArrayOutputStream((int) limit); blockAllocated(); } @@ -449,7 +449,7 @@ static class ByteArrayBlock extends DataBlock { * @return the amount of data available to upload. */ @Override - int dataSize() { + long dataSize() { return dataSize != null ? dataSize : buffer.size(); } @@ -468,14 +468,14 @@ boolean hasCapacity(long bytes) { } @Override - int remainingCapacity() { + long remainingCapacity() { return limit - dataSize(); } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); buffer.write(b, offset, written); return written; } @@ -514,7 +514,7 @@ static class ByteBufferBlockFactory extends BlockFactory { } @Override - ByteBufferBlock create(long index, int limit, + ByteBufferBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { return new ByteBufferBlock(index, limit, statistics); @@ -564,11 +564,12 @@ class ByteBufferBlock extends DataBlock { * @param statistics statistics to update */ ByteBufferBlock(long index, - int bufferSize, + long bufferSize, BlockOutputStreamStatistics statistics) { super(index, statistics); - this.bufferSize = bufferSize; - blockBuffer = requestBuffer(bufferSize); + this.bufferSize = bufferSize > Integer.MAX_VALUE ? + Integer.MAX_VALUE : (int) bufferSize; + blockBuffer = requestBuffer((int) bufferSize); blockAllocated(); } @@ -577,7 +578,7 @@ class ByteBufferBlock extends DataBlock { * @return the amount of data available to upload. */ @Override - int dataSize() { + long dataSize() { return dataSize != null ? dataSize : bufferCapacityUsed(); } @@ -598,7 +599,7 @@ public boolean hasCapacity(long bytes) { } @Override - public int remainingCapacity() { + public long remainingCapacity() { return blockBuffer != null ? blockBuffer.remaining() : 0; } @@ -609,7 +610,7 @@ private int bufferCapacityUsed() { @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); blockBuffer.put(b, offset, written); return written; } @@ -809,7 +810,7 @@ static class DiskBlockFactory extends BlockFactory { */ @Override DataBlock create(long index, - int limit, + long limit, BlockOutputStreamStatistics statistics) throws IOException { File destFile = getOwner() @@ -825,14 +826,14 @@ DataBlock create(long index, */ static class DiskBlock extends DataBlock { - private int bytesWritten; + private long bytesWritten; private final File bufferFile; - private final int limit; + private final long limit; private BufferedOutputStream out; private final AtomicBoolean closed = new AtomicBoolean(false); DiskBlock(File bufferFile, - int limit, + long limit, long index, BlockOutputStreamStatistics statistics) throws FileNotFoundException { @@ -844,7 +845,7 @@ static class DiskBlock extends DataBlock { } @Override - int dataSize() { + long dataSize() { return bytesWritten; } @@ -854,14 +855,14 @@ boolean hasCapacity(long bytes) { } @Override - int remainingCapacity() { + long remainingCapacity() { return limit - bytesWritten; } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); out.write(b, offset, written); bytesWritten += written; return written; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 46568ec2a8d3d..94ce7c70315c2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -1505,7 +1505,7 @@ public void blockReleased() { * of block uploads pending (1) and the bytes pending (blockSize). */ @Override - public void blockUploadQueued(int blockSize) { + public void blockUploadQueued(long blockSize) { incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize); @@ -1518,7 +1518,7 @@ public void blockUploadQueued(int blockSize) { * {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}. */ @Override - public void blockUploadStarted(Duration timeInQueue, int blockSize) { + public void blockUploadStarted(Duration timeInQueue, long blockSize) { // the local counter is used in toString reporting. queueDuration.addAndGet(timeInQueue.toMillis()); // update the duration fields in the IOStatistics. @@ -1546,7 +1546,7 @@ private IOStatisticsStore localIOStatistics() { @Override public void blockUploadCompleted( Duration timeSinceUploadStarted, - int blockSize) { + long blockSize) { transferDuration.addAndGet(timeSinceUploadStarted.toMillis()); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1); blockUploadsCompleted.incrementAndGet(); @@ -1560,7 +1560,7 @@ public void blockUploadCompleted( @Override public void blockUploadFailed( Duration timeSinceUploadStarted, - int blockSize) { + long blockSize) { incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 8a1947f3e42dc..eac2a182173d4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -1031,6 +1031,19 @@ public static long getMultipartSizeProperty(Configuration conf, return partSize; } + public static boolean checkDiskBuffer(Configuration conf){ + boolean isAllowedMultipart = conf.getBoolean(ALLOW_MULTIPART_UPLOADS, + IS_ALLOWED_MULTIPART_UPLOADS_DEFAULT); + if (isAllowedMultipart) { + return true; + } else if (!isAllowedMultipart && conf.get(FAST_UPLOAD_BUFFER) + .equals(FAST_UPLOAD_BUFFER_DISK)){ + return true; + } else { + return false; + } + } + /** * Ensure that the long value is in the range of an integer. * @param name property name for error messages diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 14ffeed4a55bb..7813255050c8e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -532,7 +532,7 @@ public UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, Long offset) throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 321390446f705..32888314d881a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -233,7 +233,7 @@ UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, Long offset) throws IOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index cae4d3ef034e8..c08adab970370 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -248,7 +248,7 @@ UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, long offset) throws PathIOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index ce11df0383929..56a1330a98c61 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -509,7 +509,7 @@ public UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, long offset) throws PathIOException { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java index bd1466b2a432f..554b628d003a4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java @@ -32,21 +32,21 @@ public interface BlockOutputStreamStatistics extends Closeable, * Block is queued for upload. * @param blockSize block size. */ - void blockUploadQueued(int blockSize); + void blockUploadQueued(long blockSize); /** * Queued block has been scheduled for upload. * @param timeInQueue time in the queue. * @param blockSize block size. */ - void blockUploadStarted(Duration timeInQueue, int blockSize); + void blockUploadStarted(Duration timeInQueue, long blockSize); /** * A block upload has completed. Duration excludes time in the queue. * @param timeSinceUploadStarted time in since the transfer began. * @param blockSize block size */ - void blockUploadCompleted(Duration timeSinceUploadStarted, int blockSize); + void blockUploadCompleted(Duration timeSinceUploadStarted, long blockSize); /** * A block upload has failed. Duration excludes time in the queue. @@ -57,7 +57,7 @@ public interface BlockOutputStreamStatistics extends Closeable, * @param timeSinceUploadStarted time in since the transfer began. * @param blockSize block size */ - void blockUploadFailed(Duration timeSinceUploadStarted, int blockSize); + void blockUploadFailed(Duration timeSinceUploadStarted, long blockSize); /** * Intermediate report of bytes uploaded. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index d10b6484175b1..6454065b240c6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -442,22 +442,22 @@ private static final class EmptyBlockOutputStreamStatistics implements BlockOutputStreamStatistics { @Override - public void blockUploadQueued(final int blockSize) { + public void blockUploadQueued(final long blockSize) { } @Override public void blockUploadStarted(final Duration timeInQueue, - final int blockSize) { + final long blockSize) { } @Override public void blockUploadCompleted(final Duration timeSinceUploadStarted, - final int blockSize) { + final long blockSize) { } @Override public void blockUploadFailed(final Duration timeSinceUploadStarted, - final int blockSize) { + final long blockSize) { } @Override From 768f41bb84412e6d296a5c5751e6bca608f2c9d1 Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Mon, 13 Mar 2023 15:11:41 +0530 Subject: [PATCH 02/17] ADDED TEST --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 2 +- .../hadoop/fs/s3a/WriteOperationHelper.java | 4 +-- .../fs/s3a/scale/ITestS3AHugeFileUpload.java | 31 +++++++++++++++++++ 3 files changed, 34 insertions(+), 3 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3e6f2322d3b00..79c19c7cdf622 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -595,7 +595,7 @@ public void initialize(URI name, Configuration originalConf) } blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER); - partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); + //partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer); blockOutputActiveBlocks = intOption(conf, FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 7813255050c8e..c394a8031d094 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -269,8 +269,8 @@ public PutObjectRequest createPutObjectRequest( String dest, File sourceFile, final PutObjectOptions options) { - Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE, - "File length is too big for a single PUT upload"); + //Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE, + // "File length is too big for a single PUT upload"); activateAuditSpan(); final ObjectMetadata objectMetadata = newObjectMetadata((int) sourceFile.length()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java new file mode 100644 index 0000000000000..de11ed1686569 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java @@ -0,0 +1,31 @@ +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.Constants; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class ITestS3AHugeFileUpload extends S3AScaleTestBase{ + final private Logger LOG = LoggerFactory.getLogger( + ITestS3AHugeFileUpload.class.getName()); + + private long fileSize = Integer.MAX_VALUE * 2L; + @Override + protected Configuration createScaleConfiguration() { + Configuration configuration = super.createScaleConfiguration(); + configuration.setBoolean(Constants.ALLOW_MULTIPART_UPLOADS, false); + configuration.setInt(KEY_TEST_TIMEOUT, 36000); + return configuration; + } + + @Test + public void uploadFileSinglePut() throws IOException { + LOG.info("Creating file with size : {}", fileSize); + ContractTestUtils.createAndVerifyFile(getFileSystem(), + getTestPath(), fileSize ); + } +} From ddde1e6b14894483962dc577d7d8afe6e802869a Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Mon, 13 Mar 2023 15:55:10 +0530 Subject: [PATCH 03/17] MORE FIXES --- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 19 +++++++++++++++++-- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 9 ++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 2dbff80730daa..6a9a450ed6dae 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements /** Thread level IOStatistics Aggregator. */ private final IOStatisticsAggregator threadIOStatisticsAggregator; + /**Is multipart upload allowed? */ + private final boolean isMultipartAllowed; + /** * An S3A output stream which uploads partitions in a separate pool of * threads; different {@link S3ADataBlocks.BlockFactory} @@ -181,7 +184,7 @@ class S3ABlockOutputStream extends OutputStream implements this.builder = builder; this.key = builder.key; this.blockFactory = builder.blockFactory; - this.blockSize = (int) builder.blockSize; + this.blockSize = builder.blockSize; this.statistics = builder.statistics; // test instantiations may not provide statistics; this.iostatistics = statistics.getIOStatistics(); @@ -200,7 +203,8 @@ class S3ABlockOutputStream extends OutputStream implements createBlockIfNeeded(); LOG.debug("Initialized S3ABlockOutputStream for {}" + " output to {}", key, activeBlock); - if (putTracker.initialize()) { + this.isMultipartAllowed = builder.isMultipartAllowed; + if (putTracker.initialize() && this.isMultipartAllowed) { LOG.debug("Put tracker requests multipart upload"); initMultipartUpload(); } @@ -1128,6 +1132,11 @@ public static final class BlockOutputStreamBuilder { */ private IOStatisticsAggregator ioStatisticsAggregator; + /** + * Is Multipart Uploads enabled for the given upload + */ + private boolean isMultipartAllowed; + private BlockOutputStreamBuilder() { } @@ -1278,5 +1287,11 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator( ioStatisticsAggregator = value; return this; } + + public BlockOutputStreamBuilder withMultipartAllowed( + final boolean value) { + isMultipartAllowed = value; + return this; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 79c19c7cdf622..3f2bc451d3b78 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1831,6 +1831,11 @@ private FSDataOutputStream innerCreateFile( final PutObjectOptions putOptions = new PutObjectOptions(keep, null, options.getHeaders()); + if(!checkDiskBuffer(getConf())){ + throw new IOException("The filesystem conf is not " + + "proper for the output stream"); + } + final S3ABlockOutputStream.BlockOutputStreamBuilder builder = S3ABlockOutputStream.builder() .withKey(destKey) @@ -1854,7 +1859,9 @@ private FSDataOutputStream innerCreateFile( .withCSEEnabled(isCSEEnabled) .withPutOptions(putOptions) .withIOStatisticsAggregator( - IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()); + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) + .withMultipartAllowed(getConf().getBoolean(ALLOW_MULTIPART_UPLOADS, + IS_ALLOWED_MULTIPART_UPLOADS_DEFAULT)); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); From 542ffc2c011d52924e8dfb5363b814d57f566e4e Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Tue, 14 Mar 2023 15:34:51 +0530 Subject: [PATCH 04/17] MORE FIXES and Patches --- .../src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 1 + .../org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java | 3 +++ 2 files changed, 4 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3f2bc451d3b78..8d83ff7f3669d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -516,6 +516,7 @@ public void initialize(URI name, Configuration originalConf) maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); partSize = getMultipartSizeProperty(conf, MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + LOG.warn("Patcchhhh: The part size is : {}", partSize); multiPartThreshold = getMultipartSizeProperty(conf, MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java index de11ed1686569..46a7f070bdde4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java @@ -9,6 +9,8 @@ import java.io.IOException; +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; + public class ITestS3AHugeFileUpload extends S3AScaleTestBase{ final private Logger LOG = LoggerFactory.getLogger( ITestS3AHugeFileUpload.class.getName()); @@ -18,6 +20,7 @@ public class ITestS3AHugeFileUpload extends S3AScaleTestBase{ protected Configuration createScaleConfiguration() { Configuration configuration = super.createScaleConfiguration(); configuration.setBoolean(Constants.ALLOW_MULTIPART_UPLOADS, false); + configuration.setLong(MULTIPART_SIZE, 53687091200L); configuration.setInt(KEY_TEST_TIMEOUT, 36000); return configuration; } From 773e03ae1076f4ba5465ceb85e45f98535bd9a9d Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Tue, 14 Mar 2023 15:40:22 +0530 Subject: [PATCH 05/17] MORE FIXES and Patches --- .../org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java index 46a7f070bdde4..c7f9eae3aa5a2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java @@ -9,6 +9,7 @@ import java.io.IOException; +import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; public class ITestS3AHugeFileUpload extends S3AScaleTestBase{ @@ -22,6 +23,7 @@ protected Configuration createScaleConfiguration() { configuration.setBoolean(Constants.ALLOW_MULTIPART_UPLOADS, false); configuration.setLong(MULTIPART_SIZE, 53687091200L); configuration.setInt(KEY_TEST_TIMEOUT, 36000); + configuration.setInt(IO_CHUNK_BUFFER_SIZE, 655360); return configuration; } From 58a04530b0ad7629b57e688aceac241c2245a9a5 Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Tue, 14 Mar 2023 17:56:21 +0530 Subject: [PATCH 06/17] MORE FIXES --- .../java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 6a9a450ed6dae..1339a076b1973 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -204,7 +204,7 @@ class S3ABlockOutputStream extends OutputStream implements LOG.debug("Initialized S3ABlockOutputStream for {}" + " output to {}", key, activeBlock); this.isMultipartAllowed = builder.isMultipartAllowed; - if (putTracker.initialize() && this.isMultipartAllowed) { + if (putTracker.initialize()) { LOG.debug("Put tracker requests multipart upload"); initMultipartUpload(); } @@ -373,6 +373,9 @@ private synchronized void uploadCurrentBlock(boolean isLast) */ @Retries.RetryTranslated private void initMultipartUpload() throws IOException { + if (!isMultipartAllowed){ + return; + } if (multiPartUpload == null) { LOG.debug("Initiating Multipart upload"); multiPartUpload = new MultiPartUpload(key); From 5d3f9d9d443dbfa7af67f98cfc892caf475b194a Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Wed, 15 Mar 2023 14:55:08 +0530 Subject: [PATCH 07/17] MORE FIXES FOR TEST --- .../org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java index c7f9eae3aa5a2..fa5d4ab2faf55 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java @@ -24,6 +24,7 @@ protected Configuration createScaleConfiguration() { configuration.setLong(MULTIPART_SIZE, 53687091200L); configuration.setInt(KEY_TEST_TIMEOUT, 36000); configuration.setInt(IO_CHUNK_BUFFER_SIZE, 655360); + configuration.set("fs.s3a.connection.request.timeout", "1h"); return configuration; } From a2d25f629c809071990645855774528e52103cfe Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Wed, 15 Mar 2023 15:17:05 +0530 Subject: [PATCH 08/17] Added License Doc --- .../fs/s3a/scale/ITestS3AHugeFileUpload.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java index fa5d4ab2faf55..f7f0115cb4454 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.fs.s3a.scale; import org.apache.hadoop.conf.Configuration; From f381b88a9260facd94616e0cf62c426b44575f45 Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Mon, 27 Mar 2023 18:24:18 +0530 Subject: [PATCH 09/17] Review Fixes --- .../src/main/java/org/apache/hadoop/fs/s3a/Constants.java | 4 ++-- .../org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 6 +++--- .../main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java | 2 +- .../main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 6 ++---- .../src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java | 8 ++++---- .../org/apache/hadoop/fs/s3a/WriteOperationHelper.java | 2 -- .../hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java | 2 +- 7 files changed, 13 insertions(+), 17 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index b71b319462b6e..07b829a4164d9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1256,7 +1256,7 @@ private Constants() { public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count"; public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8; - public static final String ALLOW_MULTIPART_UPLOADS = "fs.s3a.allow.multipart.uploads"; + public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled"; - public static final boolean IS_ALLOWED_MULTIPART_UPLOADS_DEFAULT = true; + public static final boolean MULTIPART_UPLOAD_ENABLED_DEFAULT = true; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 1339a076b1973..be0c50bbd8eec 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -170,7 +170,7 @@ class S3ABlockOutputStream extends OutputStream implements private final IOStatisticsAggregator threadIOStatisticsAggregator; /**Is multipart upload allowed? */ - private final boolean isMultipartAllowed; + private final boolean isMultipartEnabled; /** * An S3A output stream which uploads partitions in a separate pool of @@ -203,7 +203,7 @@ class S3ABlockOutputStream extends OutputStream implements createBlockIfNeeded(); LOG.debug("Initialized S3ABlockOutputStream for {}" + " output to {}", key, activeBlock); - this.isMultipartAllowed = builder.isMultipartAllowed; + this.isMultipartEnabled = builder.isMultipartAllowed; if (putTracker.initialize()) { LOG.debug("Put tracker requests multipart upload"); initMultipartUpload(); @@ -373,7 +373,7 @@ private synchronized void uploadCurrentBlock(boolean isLast) */ @Retries.RetryTranslated private void initMultipartUpload() throws IOException { - if (!isMultipartAllowed){ + if (!isMultipartEnabled){ return; } if (multiPartUpload == null) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index 518c2277623ba..e7f5a27c65469 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -569,7 +569,7 @@ class ByteBufferBlock extends DataBlock { super(index, statistics); this.bufferSize = bufferSize > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) bufferSize; - blockBuffer = requestBuffer((int) bufferSize); + blockBuffer = requestBuffer(this.bufferSize); blockAllocated(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 8d83ff7f3669d..38a72db2a94e3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -516,7 +516,6 @@ public void initialize(URI name, Configuration originalConf) maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); partSize = getMultipartSizeProperty(conf, MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - LOG.warn("Patcchhhh: The part size is : {}", partSize); multiPartThreshold = getMultipartSizeProperty(conf, MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD); @@ -596,7 +595,6 @@ public void initialize(URI name, Configuration originalConf) } blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER); - //partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer); blockOutputActiveBlocks = intOption(conf, FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1); @@ -1861,8 +1859,8 @@ private FSDataOutputStream innerCreateFile( .withPutOptions(putOptions) .withIOStatisticsAggregator( IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) - .withMultipartAllowed(getConf().getBoolean(ALLOW_MULTIPART_UPLOADS, - IS_ALLOWED_MULTIPART_UPLOADS_DEFAULT)); + .withMultipartAllowed(getConf().getBoolean( + MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT)); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index eac2a182173d4..25e323b372db8 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -1032,11 +1032,11 @@ public static long getMultipartSizeProperty(Configuration conf, } public static boolean checkDiskBuffer(Configuration conf){ - boolean isAllowedMultipart = conf.getBoolean(ALLOW_MULTIPART_UPLOADS, - IS_ALLOWED_MULTIPART_UPLOADS_DEFAULT); - if (isAllowedMultipart) { + boolean isMultipartEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + MULTIPART_UPLOAD_ENABLED_DEFAULT); + if (isMultipartEnabled) { return true; - } else if (!isAllowedMultipart && conf.get(FAST_UPLOAD_BUFFER) + } else if (!isMultipartEnabled && conf.get(FAST_UPLOAD_BUFFER) .equals(FAST_UPLOAD_BUFFER_DISK)){ return true; } else { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index c394a8031d094..7f9db33157f6d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -269,8 +269,6 @@ public PutObjectRequest createPutObjectRequest( String dest, File sourceFile, final PutObjectOptions options) { - //Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE, - // "File length is too big for a single PUT upload"); activateAuditSpan(); final ObjectMetadata objectMetadata = newObjectMetadata((int) sourceFile.length()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java index f7f0115cb4454..f344d38db444b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java @@ -38,7 +38,7 @@ public class ITestS3AHugeFileUpload extends S3AScaleTestBase{ @Override protected Configuration createScaleConfiguration() { Configuration configuration = super.createScaleConfiguration(); - configuration.setBoolean(Constants.ALLOW_MULTIPART_UPLOADS, false); + configuration.setBoolean(Constants.MULTIPART_UPLOADS_ENABLED, false); configuration.setLong(MULTIPART_SIZE, 53687091200L); configuration.setInt(KEY_TEST_TIMEOUT, 36000); configuration.setInt(IO_CHUNK_BUFFER_SIZE, 655360); From ca725f9fc8d382bb9fe7ab8257bc318fd4aeb5e6 Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Thu, 30 Mar 2023 16:15:32 +0530 Subject: [PATCH 10/17] Review Fixes --- .../apache/hadoop/fs/StreamCapabilities.java | 5 +++ .../org/apache/hadoop/fs/s3a/Constants.java | 9 +++++ .../hadoop/fs/s3a/S3ABlockOutputStream.java | 27 ++++++------- .../apache/hadoop/fs/s3a/S3ADataBlocks.java | 2 +- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 2 +- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 13 ++++++- .../site/markdown/tools/hadoop-aws/index.md | 4 +- ...a => ITestS3AHugeFileUploadSinglePut.java} | 39 ++++++++++++++----- 8 files changed, 73 insertions(+), 28 deletions(-) rename hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/{ITestS3AHugeFileUpload.java => ITestS3AHugeFileUploadSinglePut.java} (62%) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 93ed57ef83057..721cd2e4ee63a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -99,6 +99,11 @@ public interface StreamCapabilities { */ String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported"; + /** + * Stream support multipart uploads to the given patch + */ + String MULTIPART_SUPPORTED = "fs.capability.multipart.supported"; + /** * Capabilities that a stream can support and be queried for. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 07b829a4164d9..68aa140691935 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1256,7 +1256,16 @@ private Constants() { public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count"; public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8; + /** + * Option to enable or disable the multipart uploads. + *

+ * Default is {@link #MULTIPART_UPLOAD_ENABLED_DEFAULT}. + */ public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled"; + /** + * Default value for multipart uploads. + * {@value} + */ public static final boolean MULTIPART_UPLOAD_ENABLED_DEFAULT = true; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index be0c50bbd8eec..1bebbb3882d90 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -169,7 +169,7 @@ class S3ABlockOutputStream extends OutputStream implements /** Thread level IOStatistics Aggregator. */ private final IOStatisticsAggregator threadIOStatisticsAggregator; - /**Is multipart upload allowed? */ + /** Is multipart upload enabled? */ private final boolean isMultipartEnabled; /** @@ -203,7 +203,7 @@ class S3ABlockOutputStream extends OutputStream implements createBlockIfNeeded(); LOG.debug("Initialized S3ABlockOutputStream for {}" + " output to {}", key, activeBlock); - this.isMultipartEnabled = builder.isMultipartAllowed; + this.isMultipartEnabled = builder.isMultipartEnabled; if (putTracker.initialize()) { LOG.debug("Put tracker requests multipart upload"); initMultipartUpload(); @@ -373,9 +373,8 @@ private synchronized void uploadCurrentBlock(boolean isLast) */ @Retries.RetryTranslated private void initMultipartUpload() throws IOException { - if (!isMultipartEnabled){ - return; - } + Preconditions.checkState(!isMultipartEnabled, + "multipart upload is disabled"); if (multiPartUpload == null) { LOG.debug("Initiating Multipart upload"); multiPartUpload = new MultiPartUpload(key); @@ -568,12 +567,11 @@ public String toString() { * Upload the current block as a single PUT request; if the buffer is empty a * 0-byte PUT will be invoked, as it is needed to create an entry at the far * end. - * * @return number of bytes uploaded. If thread was interrupted while waiting - * for upload to complete, returns zero with interrupted flag set on this - * thread. + * for upload to complete, returns zero with interrupted flag set on this + * thread. * @throws IOException - * any problem. + * any problem. */ private long putObject() throws IOException { LOG.debug("Executing regular upload for {}", writeOperationHelper); @@ -692,6 +690,9 @@ public boolean hasCapability(String capability) { case StreamCapabilities.IOSTATISTICS_CONTEXT: return true; + case StreamCapabilities.MULTIPART_SUPPORTED: + return isMultipartEnabled; + default: return false; } @@ -1136,9 +1137,9 @@ public static final class BlockOutputStreamBuilder { private IOStatisticsAggregator ioStatisticsAggregator; /** - * Is Multipart Uploads enabled for the given upload + * Is Multipart Uploads enabled for the given upload. */ - private boolean isMultipartAllowed; + private boolean isMultipartEnabled; private BlockOutputStreamBuilder() { } @@ -1291,9 +1292,9 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator( return this; } - public BlockOutputStreamBuilder withMultipartAllowed( + public BlockOutputStreamBuilder withMultipartEnabled( final boolean value) { - isMultipartAllowed = value; + isMultipartEnabled = value; return this; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index e7f5a27c65469..c20690b9ceb28 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -440,7 +440,7 @@ static class ByteArrayBlock extends DataBlock { BlockOutputStreamStatistics statistics) { super(index, statistics); this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit; - buffer = new S3AByteArrayOutputStream((int) limit); + buffer = new S3AByteArrayOutputStream(this.limit); blockAllocated(); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 38a72db2a94e3..27375e4d88fc3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1859,7 +1859,7 @@ private FSDataOutputStream innerCreateFile( .withPutOptions(putOptions) .withIOStatisticsAggregator( IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) - .withMultipartAllowed(getConf().getBoolean( + .withMultipartEnabled(getConf().getBoolean( MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT)); return new FSDataOutputStream( new S3ABlockOutputStream(builder), diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 25e323b372db8..447a6f9c003e2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -1031,13 +1031,22 @@ public static long getMultipartSizeProperty(Configuration conf, return partSize; } - public static boolean checkDiskBuffer(Configuration conf){ + /** + * Check whether the configuration for S3ABlockOutputStream is + * consistent or not. Multipart uploads allow all kinds of fast buffers to + * be supported. When the option is disabled only disk buffers are allowed to + * be used as the file size might be bigger than the buffer size that can be + * allocated. + * @param conf + * @return + */ + public static boolean checkDiskBuffer(Configuration conf) { boolean isMultipartEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT); if (isMultipartEnabled) { return true; } else if (!isMultipartEnabled && conf.get(FAST_UPLOAD_BUFFER) - .equals(FAST_UPLOAD_BUFFER_DISK)){ + .equals(FAST_UPLOAD_BUFFER_DISK)) { return true; } else { return false; diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index cd7793bfa92d3..e8ebf2c2a20f1 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1726,7 +1726,9 @@ The "fast" output stream 1. Uploads large files as blocks with the size set by `fs.s3a.multipart.size`. That is: the threshold at which multipart uploads - begin and the size of each upload are identical. + begin and the size of each upload are identical. This behavior can be enabled + or disabled by using the flag `fs.s3a.multipart.uploads.enabled` which by + default is set to true. 1. Buffers blocks to disk (default) or in on-heap or off-heap memory. 1. Uploads blocks in parallel in background threads. 1. Begins uploading blocks as soon as the buffered data exceeds this partition diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java similarity index 62% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java index f344d38db444b..4d33718b06bb7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUpload.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java @@ -18,21 +18,32 @@ package org.apache.hadoop.fs.s3a.scale; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.apache.hadoop.fs.s3a.Constants; -import org.junit.Test; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; + +import org.apache.hadoop.fs.statistics.IOStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.junit.Test; -import java.io.IOException; - +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.Constants; import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; +import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS; -public class ITestS3AHugeFileUpload extends S3AScaleTestBase{ +/** + * Test a file upload using a single PUT operation. Multipart uploads will + * be disabled in the test. + */ +public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase{ final private Logger LOG = LoggerFactory.getLogger( - ITestS3AHugeFileUpload.class.getName()); + ITestS3AHugeFileUploadSinglePut.class.getName()); private long fileSize = Integer.MAX_VALUE * 2L; @Override @@ -42,7 +53,9 @@ protected Configuration createScaleConfiguration() { configuration.setLong(MULTIPART_SIZE, 53687091200L); configuration.setInt(KEY_TEST_TIMEOUT, 36000); configuration.setInt(IO_CHUNK_BUFFER_SIZE, 655360); - configuration.set("fs.s3a.connection.request.timeout", "1h"); + configuration.set(REQUEST_TIMEOUT, "1h"); + fileSize = getTestPropertyBytes(configuration, KEY_HUGE_FILESIZE, + DEFAULT_HUGE_FILESIZE); return configuration; } @@ -50,6 +63,12 @@ protected Configuration createScaleConfiguration() { public void uploadFileSinglePut() throws IOException { LOG.info("Creating file with size : {}", fileSize); ContractTestUtils.createAndVerifyFile(getFileSystem(), - getTestPath(), fileSize ); + getTestPath(), fileSize); + Map stats = IOStatisticsSupport.snapshotIOStatistics().counters(); + LOG.warn("Patch: " + stats.toString()); + stats = IOStatisticsContext.getCurrentIOStatisticsContext().getIOStatistics().counters(); + LOG.warn("Patch:{}", stats.toString()); + assertEquals(2L, + (long) stats.get(OBJECT_PUT_REQUESTS.getSymbol())); } } From ea0007fa8ac7aba823faab608539cf133cb27323 Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Mon, 3 Apr 2023 16:25:40 +0530 Subject: [PATCH 11/17] Review Fixes with the test --- .../ITestS3AHugeFileUploadSinglePut.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java index 4d33718b06bb7..a2eaa383589da 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java @@ -19,18 +19,16 @@ package org.apache.hadoop.fs.s3a.scale; import java.io.IOException; -import java.util.Map; -import java.util.Optional; -import org.apache.hadoop.fs.statistics.IOStatisticsContext; -import org.apache.hadoop.fs.statistics.IOStatisticsSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.junit.Test; +import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.Constants; + import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE; import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; @@ -62,13 +60,14 @@ protected Configuration createScaleConfiguration() { @Test public void uploadFileSinglePut() throws IOException { LOG.info("Creating file with size : {}", fileSize); - ContractTestUtils.createAndVerifyFile(getFileSystem(), + S3AFileSystem fs = getFileSystem(); + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), fileSize); - Map stats = IOStatisticsSupport.snapshotIOStatistics().counters(); - LOG.warn("Patch: " + stats.toString()); - stats = IOStatisticsContext.getCurrentIOStatisticsContext().getIOStatistics().counters(); - LOG.warn("Patch:{}", stats.toString()); - assertEquals(2L, - (long) stats.get(OBJECT_PUT_REQUESTS.getSymbol())); + //No more than three put requests should be made during the upload of the file + //First one being the creation of test/ directory marker + //Second being the creation of the file with tests3ascale/ + //Third being the creation of directory marker tests3ascale/ on the file delete + assertEquals(3L, + (long) fs.getIOStatistics().counters().get(OBJECT_PUT_REQUESTS.getSymbol())); } } From 1f56e2a27ba37e1896ccf9b86e346278d149f617 Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Tue, 4 Apr 2023 16:25:26 +0530 Subject: [PATCH 12/17] File Fixes --- .../org/apache/hadoop/fs/StreamCapabilities.java | 2 +- .../java/org/apache/hadoop/fs/s3a/Constants.java | 1 + .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 14 ++++++++++---- .../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 4 ++-- .../s3a/scale/ITestS3AHugeFileUploadSinglePut.java | 5 +++-- 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 721cd2e4ee63a..6df8e3e22fdfb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -100,7 +100,7 @@ public interface StreamCapabilities { String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported"; /** - * Stream support multipart uploads to the given patch + * Stream supports multipart uploads to the given path. */ String MULTIPART_SUPPORTED = "fs.capability.multipart.supported"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 68aa140691935..c884571d8a6f7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1258,6 +1258,7 @@ private Constants() { /** * Option to enable or disable the multipart uploads. + * Value: {@value}. *

* Default is {@link #MULTIPART_UPLOAD_ENABLED_DEFAULT}. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 27375e4d88fc3..4184a2e2e1503 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -414,6 +414,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private ArnResource accessPoint; + /** + * Is this S3A FS instance has multipart uploads enabled? + */ + private boolean isMultipartEnabled; + /** * A cache of files that should be deleted when the FileSystem is closed * or the JVM is exited. @@ -533,6 +538,8 @@ public void initialize(URI name, Configuration originalConf) this.prefetchBlockSize = (int) prefetchBlockSizeLong; this.prefetchBlockCount = intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1); + this.isMultipartEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + MULTIPART_UPLOAD_ENABLED_DEFAULT); initThreadPools(conf); @@ -1831,8 +1838,8 @@ private FSDataOutputStream innerCreateFile( new PutObjectOptions(keep, null, options.getHeaders()); if(!checkDiskBuffer(getConf())){ - throw new IOException("The filesystem conf is not " + - "proper for the output stream"); + throw new IOException("Unable to create OutputStream with the given" + + "multipart upload and buffer configuration."); } final S3ABlockOutputStream.BlockOutputStreamBuilder builder = @@ -1859,8 +1866,7 @@ private FSDataOutputStream innerCreateFile( .withPutOptions(putOptions) .withIOStatisticsAggregator( IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) - .withMultipartEnabled(getConf().getBoolean( - MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT)); + .withMultipartEnabled(isMultipartEnabled); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 447a6f9c003e2..16f13d3b91b0a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -1037,8 +1037,8 @@ public static long getMultipartSizeProperty(Configuration conf, * be supported. When the option is disabled only disk buffers are allowed to * be used as the file size might be bigger than the buffer size that can be * allocated. - * @param conf - * @return + * @param conf : configuration object for the given context + * @return true if the disk buffer and the multipart settings are supported */ public static boolean checkDiskBuffer(Configuration conf) { boolean isMultipartEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java index a2eaa383589da..41673e57dbf55 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java @@ -34,6 +34,7 @@ import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; /** * Test a file upload using a single PUT operation. Multipart uploads will @@ -67,7 +68,7 @@ public void uploadFileSinglePut() throws IOException { //First one being the creation of test/ directory marker //Second being the creation of the file with tests3ascale/ //Third being the creation of directory marker tests3ascale/ on the file delete - assertEquals(3L, - (long) fs.getIOStatistics().counters().get(OBJECT_PUT_REQUESTS.getSymbol())); + assertThatStatisticCounter(fs.getIOStatistics(), OBJECT_PUT_REQUESTS.getSymbol()) + .isEqualTo(3); } } From 4e922b4a81e4c6abc85d43267e74eb3b9827d197 Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Thu, 6 Apr 2023 09:13:45 +0530 Subject: [PATCH 13/17] Minor Changes --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 7 ++++- .../hadoop/fs/s3a/api/RequestFactory.java | 3 +- .../fs/s3a/commit/AbstractS3ACommitter.java | 4 +++ .../fs/s3a/impl/RequestFactoryImpl.java | 28 ++++++++++++++++++- .../fs/s3a/impl/TestRequestFactory.java | 7 ++++- 5 files changed, 45 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 4184a2e2e1503..2c51ddf52d3bd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -1087,6 +1087,7 @@ protected RequestFactory createRequestFactory() { .withRequestPreparer(getAuditManager()::requestCreated) .withContentEncoding(contentEncoding) .withStorageClass(storageClass) + .withMultipartEnabled(isMultipartEnabled) .build(); } @@ -1839,7 +1840,7 @@ private FSDataOutputStream innerCreateFile( if(!checkDiskBuffer(getConf())){ throw new IOException("Unable to create OutputStream with the given" - + "multipart upload and buffer configuration."); + + " multipart upload and buffer configuration."); } final S3ABlockOutputStream.BlockOutputStreamBuilder builder = @@ -5424,4 +5425,8 @@ public RequestFactory getRequestFactory() { public boolean isCSEEnabled() { return isCSEEnabled; } + + public boolean isMultipartEnabled() { + return isMultipartEnabled; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index c08adab970370..d0ab0076d785d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -20,6 +20,7 @@ import javax.annotation.Nullable; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Optional; @@ -199,7 +200,7 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest( */ InitiateMultipartUploadRequest newMultipartUploadRequest( String destKey, - @Nullable PutObjectOptions options); + @Nullable PutObjectOptions options) throws IOException; /** * Complete a multipart upload. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index d6044edde29dd..3174eccde5d70 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -217,6 +217,10 @@ protected AbstractS3ACommitter( LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", role, jobName(context), jobIdString(context), outputPath); S3AFileSystem fs = getDestS3AFS(); + if (!fs.isMultipartEnabled()) { + throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem," + + " the committer can't proceed."); + } // set this thread's context with the job ID. // audit spans created in this thread will pick // up this value., including the commit operations instance diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 56a1330a98c61..8158e4b219af1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -124,6 +124,11 @@ public class RequestFactoryImpl implements RequestFactory { */ private final StorageClass storageClass; + /** + * Is Multipart Enabled + */ + private final boolean isMultipartEnabled; + /** * Constructor. * @param builder builder with all the configuration. @@ -137,6 +142,7 @@ protected RequestFactoryImpl( this.requestPreparer = builder.requestPreparer; this.contentEncoding = builder.contentEncoding; this.storageClass = builder.storageClass; + this.isMultipartEnabled = builder.isMultipartEnabled; } /** @@ -460,7 +466,10 @@ public AbortMultipartUploadRequest newAbortMultipartUploadRequest( @Override public InitiateMultipartUploadRequest newMultipartUploadRequest( final String destKey, - @Nullable final PutObjectOptions options) { + @Nullable final PutObjectOptions options) throws IOException { + if(!isMultipartEnabled){ + throw new IOException("Multipart uploads are disabled on the given filesystem."); + } final ObjectMetadata objectMetadata = newObjectMetadata(-1); maybeSetMetadata(options, objectMetadata); final InitiateMultipartUploadRequest initiateMPURequest = @@ -682,6 +691,11 @@ public static final class RequestFactoryBuilder { */ private PrepareRequest requestPreparer; + /** + * Is Multipart Enabled on the path. + */ + private boolean isMultipartEnabled = true; + private RequestFactoryBuilder() { } @@ -767,6 +781,18 @@ public RequestFactoryBuilder withRequestPreparer( this.requestPreparer = value; return this; } + + /** + * Multipart enabled + * + * @param value new value + * @return the builder + */ + public RequestFactoryBuilder withMultipartEnabled( + final boolean value) { + this.isMultipartEnabled = value; + return this; + } } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 5c243bb820f02..9dff1106a064a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; @@ -173,7 +174,11 @@ private void createFactoryObjects(RequestFactory factory) { a(factory.newListObjectsV1Request(path, "/", 1)); a(factory.newListNextBatchOfObjectsRequest(new ObjectListing())); a(factory.newListObjectsV2Request(path, "/", 1)); - a(factory.newMultipartUploadRequest(path, null)); + try { + a(factory.newMultipartUploadRequest(path, null)); + } catch (IOException e) { + throw new RuntimeException(e); + } File srcfile = new File("/tmp/a"); a(factory.newPutObjectRequest(path, factory.newObjectMetadata(-1), null, srcfile)); From 13fc2d5d4f4ae17b7701a897969bdb8e0643f8bd Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Thu, 6 Apr 2023 15:13:36 +0530 Subject: [PATCH 14/17] Added tests for the committers --- .../ITestMagicCommitProtocolFailure.java | 57 ++++++++++++++++++ .../ITestStagingCommitProtocolFailure.java | 58 +++++++++++++++++++ 2 files changed, 115 insertions(+) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java new file mode 100644 index 0000000000000..fa8b8c4e2b30b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY; + +public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); + conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY); + conf.set(FS_S3A_COMMITTER_NAME, CommitConstants.COMMITTER_NAME_MAGIC); + return conf; + } + + @Test + public void testCreateCommitter() { + TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(), + new TaskAttemptID()); + Path commitPath = getFileSystem().makeQualified( + new Path(getContract().getTestPath(), "/testpath")); + LOG.debug("{}", commitPath); + assertThrows(PathCommitException.class, + () -> new MagicS3GuardCommitter(commitPath, tContext)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java new file mode 100644 index 0000000000000..85d7a16b5707b --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging.integration; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; +import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY; + +public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase { + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false); + conf.set(S3A_COMMITTER_FACTORY_KEY, CommitConstants.S3A_COMMITTER_FACTORY); + conf.set(FS_S3A_COMMITTER_NAME, InternalCommitterConstants.COMMITTER_NAME_STAGING); + return conf; + } + + @Test + public void testCreateCommitter() { + TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(), + new TaskAttemptID()); + Path commitPath = getFileSystem().makeQualified( + new Path(getContract().getTestPath(), "/testpath")); + LOG.debug("{}", commitPath); + assertThrows(PathCommitException.class, + () -> new StagingCommitter(commitPath, tContext)); + } +} From 1476424088e2d20e5cf139dfb7e075d5000218d2 Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Mon, 10 Apr 2023 09:55:44 +0530 Subject: [PATCH 15/17] Review Fixes --- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 12 ++++++------ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 17 +++++++---------- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 18 +++++++++++++++--- .../fs/s3a/commit/AbstractS3ACommitter.java | 2 +- .../hadoop/fs/s3a/impl/RequestFactoryImpl.java | 16 ++++++++-------- .../magic/ITestMagicCommitProtocolFailure.java | 7 ++++--- .../ITestStagingCommitProtocolFailure.java | 7 ++++--- .../hadoop/fs/s3a/impl/TestRequestFactory.java | 8 ++------ .../scale/ITestS3AHugeFileUploadSinglePut.java | 2 +- 9 files changed, 48 insertions(+), 41 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 1bebbb3882d90..660678feaa409 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -170,7 +170,7 @@ class S3ABlockOutputStream extends OutputStream implements private final IOStatisticsAggregator threadIOStatisticsAggregator; /** Is multipart upload enabled? */ - private final boolean isMultipartEnabled; + private final boolean isMultipartUploadEnabled; /** * An S3A output stream which uploads partitions in a separate pool of @@ -203,7 +203,7 @@ class S3ABlockOutputStream extends OutputStream implements createBlockIfNeeded(); LOG.debug("Initialized S3ABlockOutputStream for {}" + " output to {}", key, activeBlock); - this.isMultipartEnabled = builder.isMultipartEnabled; + this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; if (putTracker.initialize()) { LOG.debug("Put tracker requests multipart upload"); initMultipartUpload(); @@ -373,7 +373,7 @@ private synchronized void uploadCurrentBlock(boolean isLast) */ @Retries.RetryTranslated private void initMultipartUpload() throws IOException { - Preconditions.checkState(!isMultipartEnabled, + Preconditions.checkState(!isMultipartUploadEnabled, "multipart upload is disabled"); if (multiPartUpload == null) { LOG.debug("Initiating Multipart upload"); @@ -691,7 +691,7 @@ public boolean hasCapability(String capability) { return true; case StreamCapabilities.MULTIPART_SUPPORTED: - return isMultipartEnabled; + return isMultipartUploadEnabled; default: return false; @@ -1139,7 +1139,7 @@ public static final class BlockOutputStreamBuilder { /** * Is Multipart Uploads enabled for the given upload. */ - private boolean isMultipartEnabled; + private boolean isMultipartUploadEnabled; private BlockOutputStreamBuilder() { } @@ -1294,7 +1294,7 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator( public BlockOutputStreamBuilder withMultipartEnabled( final boolean value) { - isMultipartEnabled = value; + isMultipartUploadEnabled = value; return this; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 2c51ddf52d3bd..1680465d711f2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -417,7 +417,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, /** * Is this S3A FS instance has multipart uploads enabled? */ - private boolean isMultipartEnabled; + private boolean isMultipartUploadEnabled; /** * A cache of files that should be deleted when the FileSystem is closed @@ -538,7 +538,7 @@ public void initialize(URI name, Configuration originalConf) this.prefetchBlockSize = (int) prefetchBlockSizeLong; this.prefetchBlockCount = intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1); - this.isMultipartEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT); initThreadPools(conf); @@ -1087,7 +1087,7 @@ protected RequestFactory createRequestFactory() { .withRequestPreparer(getAuditManager()::requestCreated) .withContentEncoding(contentEncoding) .withStorageClass(storageClass) - .withMultipartEnabled(isMultipartEnabled) + .withMultipartUploadEnabled(isMultipartUploadEnabled) .build(); } @@ -1838,10 +1838,7 @@ private FSDataOutputStream innerCreateFile( final PutObjectOptions putOptions = new PutObjectOptions(keep, null, options.getHeaders()); - if(!checkDiskBuffer(getConf())){ - throw new IOException("Unable to create OutputStream with the given" - + " multipart upload and buffer configuration."); - } + validateOutputStreamConfiguration(getConf()); final S3ABlockOutputStream.BlockOutputStreamBuilder builder = S3ABlockOutputStream.builder() @@ -1867,7 +1864,7 @@ private FSDataOutputStream innerCreateFile( .withPutOptions(putOptions) .withIOStatisticsAggregator( IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) - .withMultipartEnabled(isMultipartEnabled); + .withMultipartEnabled(isMultipartUploadEnabled); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); @@ -5426,7 +5423,7 @@ public boolean isCSEEnabled() { return isCSEEnabled; } - public boolean isMultipartEnabled() { - return isMultipartEnabled; + public boolean isMultipartUploadEnabled() { + return isMultipartUploadEnabled; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 16f13d3b91b0a..cd44f8a8e4f63 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -1031,6 +1031,18 @@ public static long getMultipartSizeProperty(Configuration conf, return partSize; } + /** + * Validates the output stream configuration + * @param conf : configuration object for the given context + * @throws IOException : throws an IOException on config mismatch + */ + public static void validateOutputStreamConfiguration(Configuration conf) throws IOException { + if(!checkDiskBuffer(conf)){ + throw new IOException("Unable to create OutputStream with the given" + + " multipart upload and buffer configuration."); + } + } + /** * Check whether the configuration for S3ABlockOutputStream is * consistent or not. Multipart uploads allow all kinds of fast buffers to @@ -1041,11 +1053,11 @@ public static long getMultipartSizeProperty(Configuration conf, * @return true if the disk buffer and the multipart settings are supported */ public static boolean checkDiskBuffer(Configuration conf) { - boolean isMultipartEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT); - if (isMultipartEnabled) { + if (isMultipartUploadEnabled) { return true; - } else if (!isMultipartEnabled && conf.get(FAST_UPLOAD_BUFFER) + } else if (!isMultipartUploadEnabled && conf.get(FAST_UPLOAD_BUFFER) .equals(FAST_UPLOAD_BUFFER_DISK)) { return true; } else { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index 3174eccde5d70..e53c690431ee0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -217,7 +217,7 @@ protected AbstractS3ACommitter( LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", role, jobName(context), jobIdString(context), outputPath); S3AFileSystem fs = getDestS3AFS(); - if (!fs.isMultipartEnabled()) { + if (!fs.isMultipartUploadEnabled()) { throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem," + " the committer can't proceed."); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index 8158e4b219af1..554e5973ebb56 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -125,9 +125,9 @@ public class RequestFactoryImpl implements RequestFactory { private final StorageClass storageClass; /** - * Is Multipart Enabled + * Is multipart upload enabled. */ - private final boolean isMultipartEnabled; + private final boolean isMultipartUploadEnabled; /** * Constructor. @@ -142,7 +142,7 @@ protected RequestFactoryImpl( this.requestPreparer = builder.requestPreparer; this.contentEncoding = builder.contentEncoding; this.storageClass = builder.storageClass; - this.isMultipartEnabled = builder.isMultipartEnabled; + this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; } /** @@ -467,7 +467,7 @@ public AbortMultipartUploadRequest newAbortMultipartUploadRequest( public InitiateMultipartUploadRequest newMultipartUploadRequest( final String destKey, @Nullable final PutObjectOptions options) throws IOException { - if(!isMultipartEnabled){ + if (!isMultipartUploadEnabled) { throw new IOException("Multipart uploads are disabled on the given filesystem."); } final ObjectMetadata objectMetadata = newObjectMetadata(-1); @@ -694,7 +694,7 @@ public static final class RequestFactoryBuilder { /** * Is Multipart Enabled on the path. */ - private boolean isMultipartEnabled = true; + private boolean isMultipartUploadEnabled = true; private RequestFactoryBuilder() { } @@ -783,14 +783,14 @@ public RequestFactoryBuilder withRequestPreparer( } /** - * Multipart enabled + * Multipart upload enabled. * * @param value new value * @return the builder */ - public RequestFactoryBuilder withMultipartEnabled( + public RequestFactoryBuilder withMultipartUploadEnabled( final boolean value) { - this.isMultipartEnabled = value; + this.isMultipartUploadEnabled = value; return this; } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java index fa8b8c4e2b30b..e534a58dc312a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocolFailure.java @@ -32,6 +32,7 @@ import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; public class ITestMagicCommitProtocolFailure extends AbstractS3ATestBase { @@ -45,13 +46,13 @@ protected Configuration createConfiguration() { } @Test - public void testCreateCommitter() { + public void testCreateCommitter() throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(), new TaskAttemptID()); Path commitPath = getFileSystem().makeQualified( new Path(getContract().getTestPath(), "/testpath")); - LOG.debug("{}", commitPath); - assertThrows(PathCommitException.class, + LOG.debug("Trying to create a committer on the path: {}", commitPath); + intercept(PathCommitException.class, () -> new MagicS3GuardCommitter(commitPath, tContext)); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java index 85d7a16b5707b..e1970348a0cab 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java @@ -34,6 +34,7 @@ import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase { @Override @@ -46,13 +47,13 @@ protected Configuration createConfiguration() { } @Test - public void testCreateCommitter() { + public void testCreateCommitter() throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(getConfiguration(), new TaskAttemptID()); Path commitPath = getFileSystem().makeQualified( new Path(getContract().getTestPath(), "/testpath")); - LOG.debug("{}", commitPath); - assertThrows(PathCommitException.class, + LOG.debug("Trying to create a committer on the path: {}", commitPath); + intercept(PathCommitException.class, () -> new StagingCommitter(commitPath, tContext)); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index 9dff1106a064a..7c85142d4376d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -156,7 +156,7 @@ public T prepareRequest(final T t) { * Create objects through the factory. * @param factory factory */ - private void createFactoryObjects(RequestFactory factory) { + private void createFactoryObjects(RequestFactory factory) throws IOException { String path = "path"; String path2 = "path2"; String id = "1"; @@ -174,11 +174,7 @@ private void createFactoryObjects(RequestFactory factory) { a(factory.newListObjectsV1Request(path, "/", 1)); a(factory.newListNextBatchOfObjectsRequest(new ObjectListing())); a(factory.newListObjectsV2Request(path, "/", 1)); - try { - a(factory.newMultipartUploadRequest(path, null)); - } catch (IOException e) { - throw new RuntimeException(e); - } + a(factory.newMultipartUploadRequest(path, null)); File srcfile = new File("/tmp/a"); a(factory.newPutObjectRequest(path, factory.newObjectMetadata(-1), null, srcfile)); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java index 41673e57dbf55..0152262fe2cda 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFileUploadSinglePut.java @@ -44,7 +44,7 @@ public class ITestS3AHugeFileUploadSinglePut extends S3AScaleTestBase{ final private Logger LOG = LoggerFactory.getLogger( ITestS3AHugeFileUploadSinglePut.class.getName()); - private long fileSize = Integer.MAX_VALUE * 2L; + private long fileSize; @Override protected Configuration createScaleConfiguration() { Configuration configuration = super.createScaleConfiguration(); From f18c0cb9c0ca8baa6eb911f9c1c753124a8a9785 Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Mon, 10 Apr 2023 15:48:21 +0530 Subject: [PATCH 16/17] Review Fixes for the Path Capabilities on S3 --- .../main/java/org/apache/hadoop/fs/StreamCapabilities.java | 5 ----- .../src/main/java/org/apache/hadoop/fs/s3a/Constants.java | 7 +++++++ .../org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 3 --- .../main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 3 +++ 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 6df8e3e22fdfb..93ed57ef83057 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -99,11 +99,6 @@ public interface StreamCapabilities { */ String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported"; - /** - * Stream supports multipart uploads to the given path. - */ - String MULTIPART_SUPPORTED = "fs.capability.multipart.supported"; - /** * Capabilities that a stream can support and be queried for. */ diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index c884571d8a6f7..8bc5f77e205a6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1269,4 +1269,11 @@ private Constants() { * {@value} */ public static final boolean MULTIPART_UPLOAD_ENABLED_DEFAULT = true; + + /** + * Stream supports multipart uploads to the given path. + */ + public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED = + "fs.s3a.capability.multipart.uploads.enabled"; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 660678feaa409..c3691904ddb2b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -690,9 +690,6 @@ public boolean hasCapability(String capability) { case StreamCapabilities.IOSTATISTICS_CONTEXT: return true; - case StreamCapabilities.MULTIPART_SUPPORTED: - return isMultipartUploadEnabled; - default: return false; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 1680465d711f2..638b81b1e62cc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -5106,6 +5106,9 @@ public boolean hasPathCapability(final Path path, final String capability) case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE: return !keepDirectoryMarkers(path); + case STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED: + return isMultipartUploadEnabled(); + // create file options case FS_S3A_CREATE_PERFORMANCE: case FS_S3A_CREATE_HEADER: From 7207fddd59f637e3bc05ae2603f9855457c049d8 Mon Sep 17 00:00:00 2001 From: HarshitGupta Date: Tue, 11 Apr 2023 16:52:53 +0530 Subject: [PATCH 17/17] Review Fixes for tests --- .../apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 2 +- .../java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 5 ++--- .../main/java/org/apache/hadoop/fs/s3a/S3AUtils.java | 12 +++--------- .../org/apache/hadoop/fs/s3a/api/RequestFactory.java | 1 + .../src/site/markdown/tools/hadoop-aws/index.md | 5 +++-- .../magic/ITestMagicCommitProtocolFailure.java | 8 ++++++++ .../fs/s3a/commit/staging/StagingTestBase.java | 8 ++++++-- .../ITestStagingCommitProtocolFailure.java | 6 ++++++ .../s3a/scale/ITestS3AHugeFileUploadSinglePut.java | 9 +++++---- 9 files changed, 35 insertions(+), 21 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index c3691904ddb2b..5013ffe6ddce9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -373,7 +373,7 @@ private synchronized void uploadCurrentBlock(boolean isLast) */ @Retries.RetryTranslated private void initMultipartUpload() throws IOException { - Preconditions.checkState(!isMultipartUploadEnabled, + Preconditions.checkState(isMultipartUploadEnabled, "multipart upload is disabled"); if (multiPartUpload == null) { LOG.debug("Initiating Multipart upload"); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 638b81b1e62cc..9d6dd0639d001 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -415,7 +415,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, private ArnResource accessPoint; /** - * Is this S3A FS instance has multipart uploads enabled? + * Is this S3A FS instance has multipart upload enabled? */ private boolean isMultipartUploadEnabled; @@ -540,7 +540,6 @@ public void initialize(URI name, Configuration originalConf) intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1); this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT); - initThreadPools(conf); int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION); @@ -1864,7 +1863,7 @@ private FSDataOutputStream innerCreateFile( .withPutOptions(putOptions) .withIOStatisticsAggregator( IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) - .withMultipartEnabled(isMultipartUploadEnabled); + .withMultipartEnabled(isMultipartUploadEnabled); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index cd44f8a8e4f63..07e5bd3b023a1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -1032,7 +1032,7 @@ public static long getMultipartSizeProperty(Configuration conf, } /** - * Validates the output stream configuration + * Validates the output stream configuration. * @param conf : configuration object for the given context * @throws IOException : throws an IOException on config mismatch */ @@ -1055,14 +1055,8 @@ public static void validateOutputStreamConfiguration(Configuration conf) throws public static boolean checkDiskBuffer(Configuration conf) { boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT); - if (isMultipartUploadEnabled) { - return true; - } else if (!isMultipartUploadEnabled && conf.get(FAST_UPLOAD_BUFFER) - .equals(FAST_UPLOAD_BUFFER_DISK)) { - return true; - } else { - return false; - } + return isMultipartUploadEnabled + || FAST_UPLOAD_BUFFER_DISK.equals(conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER)); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index d0ab0076d785d..7fc6e2347a825 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -197,6 +197,7 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest( * @param destKey destination object key * @param options options for the request * @return the request. + * @throws IOException if the multipart uploads are disabled */ InitiateMultipartUploadRequest newMultipartUploadRequest( String destKey, diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index e8ebf2c2a20f1..f8fbd8aca12b1 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1,3 +1,4 @@ +