diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index e5b0a9b5aa163..8bc5f77e205a6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1255,4 +1255,25 @@ private Constants() { */ public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count"; public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8; + + /** + * Option to enable or disable the multipart uploads. + * Value: {@value}. + *

+ * Default is {@link #MULTIPART_UPLOAD_ENABLED_DEFAULT}. + */ + public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled"; + + /** + * Default value for multipart uploads. + * {@value} + */ + public static final boolean MULTIPART_UPLOAD_ENABLED_DEFAULT = true; + + /** + * Stream supports multipart uploads to the given path. + */ + public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED = + "fs.s3a.capability.multipart.uploads.enabled"; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 19943ff2f70da..5013ffe6ddce9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements private final String key; /** Size of all blocks. */ - private final int blockSize; + private final long blockSize; /** IO Statistics. */ private final IOStatistics iostatistics; @@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements /** Thread level IOStatistics Aggregator. */ private final IOStatisticsAggregator threadIOStatisticsAggregator; + /** Is multipart upload enabled? */ + private final boolean isMultipartUploadEnabled; + /** * An S3A output stream which uploads partitions in a separate pool of * threads; different {@link S3ADataBlocks.BlockFactory} @@ -181,7 +184,7 @@ class S3ABlockOutputStream extends OutputStream implements this.builder = builder; this.key = builder.key; this.blockFactory = builder.blockFactory; - this.blockSize = (int) builder.blockSize; + this.blockSize = builder.blockSize; this.statistics = builder.statistics; // test instantiations may not provide statistics; this.iostatistics = statistics.getIOStatistics(); @@ -200,6 +203,7 @@ class S3ABlockOutputStream extends OutputStream implements createBlockIfNeeded(); LOG.debug("Initialized S3ABlockOutputStream for {}" + " output to {}", key, activeBlock); + this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; if (putTracker.initialize()) { LOG.debug("Put tracker requests multipart upload"); initMultipartUpload(); @@ -318,7 +322,7 @@ public synchronized void write(byte[] source, int offset, int len) statistics.writeBytes(len); S3ADataBlocks.DataBlock block = createBlockIfNeeded(); int written = block.write(source, offset, len); - int remainingCapacity = block.remainingCapacity(); + int remainingCapacity = (int) block.remainingCapacity(); if (written < len) { // not everything was written —the block has run out // of capacity @@ -369,6 +373,8 @@ private synchronized void uploadCurrentBlock(boolean isLast) */ @Retries.RetryTranslated private void initMultipartUpload() throws IOException { + Preconditions.checkState(isMultipartUploadEnabled, + "multipart upload is disabled"); if (multiPartUpload == null) { LOG.debug("Initiating Multipart upload"); multiPartUpload = new MultiPartUpload(key); @@ -558,19 +564,20 @@ public String toString() { } /** - * Upload the current block as a single PUT request; if the buffer - * is empty a 0-byte PUT will be invoked, as it is needed to create an - * entry at the far end. - * @throws IOException any problem. - * @return number of bytes uploaded. If thread was interrupted while - * waiting for upload to complete, returns zero with interrupted flag set - * on this thread. + * Upload the current block as a single PUT request; if the buffer is empty a + * 0-byte PUT will be invoked, as it is needed to create an entry at the far + * end. + * @return number of bytes uploaded. If thread was interrupted while waiting + * for upload to complete, returns zero with interrupted flag set on this + * thread. + * @throws IOException + * any problem. */ - private int putObject() throws IOException { + private long putObject() throws IOException { LOG.debug("Executing regular upload for {}", writeOperationHelper); final S3ADataBlocks.DataBlock block = getActiveBlock(); - int size = block.dataSize(); + long size = block.dataSize(); final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); final PutObjectRequest putObjectRequest = uploadData.hasFile() ? writeOperationHelper.createPutObjectRequest( @@ -835,7 +842,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block, Preconditions.checkNotNull(uploadId, "Null uploadId"); maybeRethrowUploadFailure(); partsSubmitted++; - final int size = block.dataSize(); + final long size = block.dataSize(); bytesSubmitted += size; final int currentPartNumber = partETagsFutures.size() + 1; final UploadPartRequest request; @@ -1011,7 +1018,7 @@ public void progressChanged(ProgressEvent progressEvent) { ProgressEventType eventType = progressEvent.getEventType(); long bytesTransferred = progressEvent.getBytesTransferred(); - int size = block.dataSize(); + long size = block.dataSize(); switch (eventType) { case REQUEST_BYTE_TRANSFER_EVENT: @@ -1126,6 +1133,11 @@ public static final class BlockOutputStreamBuilder { */ private IOStatisticsAggregator ioStatisticsAggregator; + /** + * Is Multipart Uploads enabled for the given upload. + */ + private boolean isMultipartUploadEnabled; + private BlockOutputStreamBuilder() { } @@ -1276,5 +1288,11 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator( ioStatisticsAggregator = value; return this; } + + public BlockOutputStreamBuilder withMultipartEnabled( + final boolean value) { + isMultipartUploadEnabled = value; + return this; + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java index 03b5bd96162af..c20690b9ceb28 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java @@ -180,7 +180,7 @@ protected BlockFactory(S3AFileSystem owner) { * @param statistics stats to work with * @return a new block. */ - abstract DataBlock create(long index, int limit, + abstract DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException; @@ -258,7 +258,7 @@ final DestState getState() { * Return the current data size. * @return the size of the data */ - abstract int dataSize(); + abstract long dataSize(); /** * Predicate to verify that the block has the capacity to write @@ -280,7 +280,7 @@ boolean hasData() { * The remaining capacity in the block before it is full. * @return the number of bytes remaining. */ - abstract int remainingCapacity(); + abstract long remainingCapacity(); /** * Write a series of bytes from the buffer, from the offset. @@ -391,7 +391,7 @@ static class ArrayBlockFactory extends BlockFactory { } @Override - DataBlock create(long index, int limit, + DataBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { return new ByteArrayBlock(0, limit, statistics); @@ -436,11 +436,11 @@ static class ByteArrayBlock extends DataBlock { private Integer dataSize; ByteArrayBlock(long index, - int limit, + long limit, BlockOutputStreamStatistics statistics) { super(index, statistics); - this.limit = limit; - buffer = new S3AByteArrayOutputStream(limit); + this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit; + buffer = new S3AByteArrayOutputStream(this.limit); blockAllocated(); } @@ -449,7 +449,7 @@ static class ByteArrayBlock extends DataBlock { * @return the amount of data available to upload. */ @Override - int dataSize() { + long dataSize() { return dataSize != null ? dataSize : buffer.size(); } @@ -468,14 +468,14 @@ boolean hasCapacity(long bytes) { } @Override - int remainingCapacity() { + long remainingCapacity() { return limit - dataSize(); } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); buffer.write(b, offset, written); return written; } @@ -514,7 +514,7 @@ static class ByteBufferBlockFactory extends BlockFactory { } @Override - ByteBufferBlock create(long index, int limit, + ByteBufferBlock create(long index, long limit, BlockOutputStreamStatistics statistics) throws IOException { return new ByteBufferBlock(index, limit, statistics); @@ -564,11 +564,12 @@ class ByteBufferBlock extends DataBlock { * @param statistics statistics to update */ ByteBufferBlock(long index, - int bufferSize, + long bufferSize, BlockOutputStreamStatistics statistics) { super(index, statistics); - this.bufferSize = bufferSize; - blockBuffer = requestBuffer(bufferSize); + this.bufferSize = bufferSize > Integer.MAX_VALUE ? + Integer.MAX_VALUE : (int) bufferSize; + blockBuffer = requestBuffer(this.bufferSize); blockAllocated(); } @@ -577,7 +578,7 @@ class ByteBufferBlock extends DataBlock { * @return the amount of data available to upload. */ @Override - int dataSize() { + long dataSize() { return dataSize != null ? dataSize : bufferCapacityUsed(); } @@ -598,7 +599,7 @@ public boolean hasCapacity(long bytes) { } @Override - public int remainingCapacity() { + public long remainingCapacity() { return blockBuffer != null ? blockBuffer.remaining() : 0; } @@ -609,7 +610,7 @@ private int bufferCapacityUsed() { @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); blockBuffer.put(b, offset, written); return written; } @@ -809,7 +810,7 @@ static class DiskBlockFactory extends BlockFactory { */ @Override DataBlock create(long index, - int limit, + long limit, BlockOutputStreamStatistics statistics) throws IOException { File destFile = getOwner() @@ -825,14 +826,14 @@ DataBlock create(long index, */ static class DiskBlock extends DataBlock { - private int bytesWritten; + private long bytesWritten; private final File bufferFile; - private final int limit; + private final long limit; private BufferedOutputStream out; private final AtomicBoolean closed = new AtomicBoolean(false); DiskBlock(File bufferFile, - int limit, + long limit, long index, BlockOutputStreamStatistics statistics) throws FileNotFoundException { @@ -844,7 +845,7 @@ static class DiskBlock extends DataBlock { } @Override - int dataSize() { + long dataSize() { return bytesWritten; } @@ -854,14 +855,14 @@ boolean hasCapacity(long bytes) { } @Override - int remainingCapacity() { + long remainingCapacity() { return limit - bytesWritten; } @Override int write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); - int written = Math.min(remainingCapacity(), len); + int written = (int) Math.min(remainingCapacity(), len); out.write(b, offset, written); bytesWritten += written; return written; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3e6f2322d3b00..9d6dd0639d001 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -414,6 +414,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private ArnResource accessPoint; + /** + * Is this S3A FS instance has multipart upload enabled? + */ + private boolean isMultipartUploadEnabled; + /** * A cache of files that should be deleted when the FileSystem is closed * or the JVM is exited. @@ -533,7 +538,8 @@ public void initialize(URI name, Configuration originalConf) this.prefetchBlockSize = (int) prefetchBlockSizeLong; this.prefetchBlockCount = intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1); - + this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + MULTIPART_UPLOAD_ENABLED_DEFAULT); initThreadPools(conf); int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION); @@ -595,7 +601,6 @@ public void initialize(URI name, Configuration originalConf) } blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER); - partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer); blockOutputActiveBlocks = intOption(conf, FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1); @@ -1081,6 +1086,7 @@ protected RequestFactory createRequestFactory() { .withRequestPreparer(getAuditManager()::requestCreated) .withContentEncoding(contentEncoding) .withStorageClass(storageClass) + .withMultipartUploadEnabled(isMultipartUploadEnabled) .build(); } @@ -1831,6 +1837,8 @@ private FSDataOutputStream innerCreateFile( final PutObjectOptions putOptions = new PutObjectOptions(keep, null, options.getHeaders()); + validateOutputStreamConfiguration(getConf()); + final S3ABlockOutputStream.BlockOutputStreamBuilder builder = S3ABlockOutputStream.builder() .withKey(destKey) @@ -1854,7 +1862,8 @@ private FSDataOutputStream innerCreateFile( .withCSEEnabled(isCSEEnabled) .withPutOptions(putOptions) .withIOStatisticsAggregator( - IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()); + IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator()) + .withMultipartEnabled(isMultipartUploadEnabled); return new FSDataOutputStream( new S3ABlockOutputStream(builder), null); @@ -5096,6 +5105,9 @@ public boolean hasPathCapability(final Path path, final String capability) case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE: return !keepDirectoryMarkers(path); + case STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED: + return isMultipartUploadEnabled(); + // create file options case FS_S3A_CREATE_PERFORMANCE: case FS_S3A_CREATE_HEADER: @@ -5412,4 +5424,8 @@ public RequestFactory getRequestFactory() { public boolean isCSEEnabled() { return isCSEEnabled; } + + public boolean isMultipartUploadEnabled() { + return isMultipartUploadEnabled; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 46568ec2a8d3d..94ce7c70315c2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -1505,7 +1505,7 @@ public void blockReleased() { * of block uploads pending (1) and the bytes pending (blockSize). */ @Override - public void blockUploadQueued(int blockSize) { + public void blockUploadQueued(long blockSize) { incCounter(StreamStatisticNames.STREAM_WRITE_BLOCK_UPLOADS); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING, blockSize); @@ -1518,7 +1518,7 @@ public void blockUploadQueued(int blockSize) { * {@code STREAM_WRITE_BLOCK_UPLOADS_ACTIVE}. */ @Override - public void blockUploadStarted(Duration timeInQueue, int blockSize) { + public void blockUploadStarted(Duration timeInQueue, long blockSize) { // the local counter is used in toString reporting. queueDuration.addAndGet(timeInQueue.toMillis()); // update the duration fields in the IOStatistics. @@ -1546,7 +1546,7 @@ private IOStatisticsStore localIOStatistics() { @Override public void blockUploadCompleted( Duration timeSinceUploadStarted, - int blockSize) { + long blockSize) { transferDuration.addAndGet(timeSinceUploadStarted.toMillis()); incAllGauges(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1); blockUploadsCompleted.incrementAndGet(); @@ -1560,7 +1560,7 @@ public void blockUploadCompleted( @Override public void blockUploadFailed( Duration timeSinceUploadStarted, - int blockSize) { + long blockSize) { incCounter(StreamStatisticNames.STREAM_WRITE_EXCEPTIONS); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 8a1947f3e42dc..07e5bd3b023a1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -1031,6 +1031,34 @@ public static long getMultipartSizeProperty(Configuration conf, return partSize; } + /** + * Validates the output stream configuration. + * @param conf : configuration object for the given context + * @throws IOException : throws an IOException on config mismatch + */ + public static void validateOutputStreamConfiguration(Configuration conf) throws IOException { + if(!checkDiskBuffer(conf)){ + throw new IOException("Unable to create OutputStream with the given" + + " multipart upload and buffer configuration."); + } + } + + /** + * Check whether the configuration for S3ABlockOutputStream is + * consistent or not. Multipart uploads allow all kinds of fast buffers to + * be supported. When the option is disabled only disk buffers are allowed to + * be used as the file size might be bigger than the buffer size that can be + * allocated. + * @param conf : configuration object for the given context + * @return true if the disk buffer and the multipart settings are supported + */ + public static boolean checkDiskBuffer(Configuration conf) { + boolean isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, + MULTIPART_UPLOAD_ENABLED_DEFAULT); + return isMultipartUploadEnabled + || FAST_UPLOAD_BUFFER_DISK.equals(conf.get(FAST_UPLOAD_BUFFER, DEFAULT_FAST_UPLOAD_BUFFER)); + } + /** * Ensure that the long value is in the range of an integer. * @param name property name for error messages diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index 14ffeed4a55bb..7f9db33157f6d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -269,8 +269,6 @@ public PutObjectRequest createPutObjectRequest( String dest, File sourceFile, final PutObjectOptions options) { - Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE, - "File length is too big for a single PUT upload"); activateAuditSpan(); final ObjectMetadata objectMetadata = newObjectMetadata((int) sourceFile.length()); @@ -532,7 +530,7 @@ public UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, Long offset) throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 321390446f705..32888314d881a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -233,7 +233,7 @@ UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, Long offset) throws IOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index cae4d3ef034e8..7fc6e2347a825 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -20,6 +20,7 @@ import javax.annotation.Nullable; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Optional; @@ -196,10 +197,11 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest( * @param destKey destination object key * @param options options for the request * @return the request. + * @throws IOException if the multipart uploads are disabled */ InitiateMultipartUploadRequest newMultipartUploadRequest( String destKey, - @Nullable PutObjectOptions options); + @Nullable PutObjectOptions options) throws IOException; /** * Complete a multipart upload. @@ -248,7 +250,7 @@ UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, long offset) throws PathIOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java index d6044edde29dd..e53c690431ee0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java @@ -217,6 +217,10 @@ protected AbstractS3ACommitter( LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", role, jobName(context), jobIdString(context), outputPath); S3AFileSystem fs = getDestS3AFS(); + if (!fs.isMultipartUploadEnabled()) { + throw new PathCommitException(outputPath, "Multipart uploads are disabled for the FileSystem," + + " the committer can't proceed."); + } // set this thread's context with the job ID. // audit spans created in this thread will pick // up this value., including the commit operations instance diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index ce11df0383929..554e5973ebb56 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -124,6 +124,11 @@ public class RequestFactoryImpl implements RequestFactory { */ private final StorageClass storageClass; + /** + * Is multipart upload enabled. + */ + private final boolean isMultipartUploadEnabled; + /** * Constructor. * @param builder builder with all the configuration. @@ -137,6 +142,7 @@ protected RequestFactoryImpl( this.requestPreparer = builder.requestPreparer; this.contentEncoding = builder.contentEncoding; this.storageClass = builder.storageClass; + this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled; } /** @@ -460,7 +466,10 @@ public AbortMultipartUploadRequest newAbortMultipartUploadRequest( @Override public InitiateMultipartUploadRequest newMultipartUploadRequest( final String destKey, - @Nullable final PutObjectOptions options) { + @Nullable final PutObjectOptions options) throws IOException { + if (!isMultipartUploadEnabled) { + throw new IOException("Multipart uploads are disabled on the given filesystem."); + } final ObjectMetadata objectMetadata = newObjectMetadata(-1); maybeSetMetadata(options, objectMetadata); final InitiateMultipartUploadRequest initiateMPURequest = @@ -509,7 +518,7 @@ public UploadPartRequest newUploadPartRequest( String destKey, String uploadId, int partNumber, - int size, + long size, InputStream uploadStream, File sourceFile, long offset) throws PathIOException { @@ -682,6 +691,11 @@ public static final class RequestFactoryBuilder { */ private PrepareRequest requestPreparer; + /** + * Is Multipart Enabled on the path. + */ + private boolean isMultipartUploadEnabled = true; + private RequestFactoryBuilder() { } @@ -767,6 +781,18 @@ public RequestFactoryBuilder withRequestPreparer( this.requestPreparer = value; return this; } + + /** + * Multipart upload enabled. + * + * @param value new value + * @return the builder + */ + public RequestFactoryBuilder withMultipartUploadEnabled( + final boolean value) { + this.isMultipartUploadEnabled = value; + return this; + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java index bd1466b2a432f..554b628d003a4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/BlockOutputStreamStatistics.java @@ -32,21 +32,21 @@ public interface BlockOutputStreamStatistics extends Closeable, * Block is queued for upload. * @param blockSize block size. */ - void blockUploadQueued(int blockSize); + void blockUploadQueued(long blockSize); /** * Queued block has been scheduled for upload. * @param timeInQueue time in the queue. * @param blockSize block size. */ - void blockUploadStarted(Duration timeInQueue, int blockSize); + void blockUploadStarted(Duration timeInQueue, long blockSize); /** * A block upload has completed. Duration excludes time in the queue. * @param timeSinceUploadStarted time in since the transfer began. * @param blockSize block size */ - void blockUploadCompleted(Duration timeSinceUploadStarted, int blockSize); + void blockUploadCompleted(Duration timeSinceUploadStarted, long blockSize); /** * A block upload has failed. Duration excludes time in the queue. @@ -57,7 +57,7 @@ public interface BlockOutputStreamStatistics extends Closeable, * @param timeSinceUploadStarted time in since the transfer began. * @param blockSize block size */ - void blockUploadFailed(Duration timeSinceUploadStarted, int blockSize); + void blockUploadFailed(Duration timeSinceUploadStarted, long blockSize); /** * Intermediate report of bytes uploaded. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index d10b6484175b1..6454065b240c6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -442,22 +442,22 @@ private static final class EmptyBlockOutputStreamStatistics implements BlockOutputStreamStatistics { @Override - public void blockUploadQueued(final int blockSize) { + public void blockUploadQueued(final long blockSize) { } @Override public void blockUploadStarted(final Duration timeInQueue, - final int blockSize) { + final long blockSize) { } @Override public void blockUploadCompleted(final Duration timeSinceUploadStarted, - final int blockSize) { + final long blockSize) { } @Override public void blockUploadFailed(final Duration timeSinceUploadStarted, - final int blockSize) { + final long blockSize) { } @Override diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index cd7793bfa92d3..f8fbd8aca12b1 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -1,3 +1,4 @@ +