Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public interface StreamCapabilities {
*/
String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported";

/**
* Stream support multipart uploads to the given patch
*/
String MULTIPART_SUPPORTED = "fs.capability.multipart.supported";

/**
* Capabilities that a stream can support and be queried for.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,4 +1255,17 @@ private Constants() {
*/
public static final String PREFETCH_BLOCK_COUNT_KEY = "fs.s3a.prefetch.block.count";
public static final int PREFETCH_BLOCK_DEFAULT_COUNT = 8;

/**
* Option to enable or disable the multipart uploads.
* <p>
* Default is {@link #MULTIPART_UPLOAD_ENABLED_DEFAULT}.
*/
public static final String MULTIPART_UPLOADS_ENABLED = "fs.s3a.multipart.uploads.enabled";

/**
* Default value for multipart uploads.
* {@value}
*/
public static final boolean MULTIPART_UPLOAD_ENABLED_DEFAULT = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class S3ABlockOutputStream extends OutputStream implements
private final String key;

/** Size of all blocks. */
private final int blockSize;
private final long blockSize;

/** IO Statistics. */
private final IOStatistics iostatistics;
Expand Down Expand Up @@ -169,6 +169,9 @@ class S3ABlockOutputStream extends OutputStream implements
/** Thread level IOStatistics Aggregator. */
private final IOStatisticsAggregator threadIOStatisticsAggregator;

/** Is multipart upload enabled? */
private final boolean isMultipartEnabled;

/**
* An S3A output stream which uploads partitions in a separate pool of
* threads; different {@link S3ADataBlocks.BlockFactory}
Expand All @@ -181,7 +184,7 @@ class S3ABlockOutputStream extends OutputStream implements
this.builder = builder;
this.key = builder.key;
this.blockFactory = builder.blockFactory;
this.blockSize = (int) builder.blockSize;
this.blockSize = builder.blockSize;
this.statistics = builder.statistics;
// test instantiations may not provide statistics;
this.iostatistics = statistics.getIOStatistics();
Expand All @@ -200,6 +203,7 @@ class S3ABlockOutputStream extends OutputStream implements
createBlockIfNeeded();
LOG.debug("Initialized S3ABlockOutputStream for {}" +
" output to {}", key, activeBlock);
this.isMultipartEnabled = builder.isMultipartEnabled;
if (putTracker.initialize()) {
LOG.debug("Put tracker requests multipart upload");
initMultipartUpload();
Expand Down Expand Up @@ -318,7 +322,7 @@ public synchronized void write(byte[] source, int offset, int len)
statistics.writeBytes(len);
S3ADataBlocks.DataBlock block = createBlockIfNeeded();
int written = block.write(source, offset, len);
int remainingCapacity = block.remainingCapacity();
int remainingCapacity = (int) block.remainingCapacity();
if (written < len) {
// not everything was written —the block has run out
// of capacity
Expand Down Expand Up @@ -369,6 +373,8 @@ private synchronized void uploadCurrentBlock(boolean isLast)
*/
@Retries.RetryTranslated
private void initMultipartUpload() throws IOException {
Preconditions.checkState(!isMultipartEnabled,
"multipart upload is disabled");
if (multiPartUpload == null) {
LOG.debug("Initiating Multipart upload");
multiPartUpload = new MultiPartUpload(key);
Expand Down Expand Up @@ -558,19 +564,20 @@ public String toString() {
}

/**
* Upload the current block as a single PUT request; if the buffer
* is empty a 0-byte PUT will be invoked, as it is needed to create an
* entry at the far end.
* @throws IOException any problem.
* @return number of bytes uploaded. If thread was interrupted while
* waiting for upload to complete, returns zero with interrupted flag set
* on this thread.
* Upload the current block as a single PUT request; if the buffer is empty a
* 0-byte PUT will be invoked, as it is needed to create an entry at the far
* end.
* @return number of bytes uploaded. If thread was interrupted while waiting
* for upload to complete, returns zero with interrupted flag set on this
* thread.
* @throws IOException
* any problem.
*/
private int putObject() throws IOException {
private long putObject() throws IOException {
LOG.debug("Executing regular upload for {}", writeOperationHelper);

final S3ADataBlocks.DataBlock block = getActiveBlock();
int size = block.dataSize();
long size = block.dataSize();
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
writeOperationHelper.createPutObjectRequest(
Expand Down Expand Up @@ -683,6 +690,9 @@ public boolean hasCapability(String capability) {
case StreamCapabilities.IOSTATISTICS_CONTEXT:
return true;

case StreamCapabilities.MULTIPART_SUPPORTED:
return isMultipartEnabled;

default:
return false;
}
Expand Down Expand Up @@ -835,7 +845,7 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block,
Preconditions.checkNotNull(uploadId, "Null uploadId");
maybeRethrowUploadFailure();
partsSubmitted++;
final int size = block.dataSize();
final long size = block.dataSize();
bytesSubmitted += size;
final int currentPartNumber = partETagsFutures.size() + 1;
final UploadPartRequest request;
Expand Down Expand Up @@ -1011,7 +1021,7 @@ public void progressChanged(ProgressEvent progressEvent) {
ProgressEventType eventType = progressEvent.getEventType();
long bytesTransferred = progressEvent.getBytesTransferred();

int size = block.dataSize();
long size = block.dataSize();
switch (eventType) {

case REQUEST_BYTE_TRANSFER_EVENT:
Expand Down Expand Up @@ -1126,6 +1136,11 @@ public static final class BlockOutputStreamBuilder {
*/
private IOStatisticsAggregator ioStatisticsAggregator;

/**
* Is Multipart Uploads enabled for the given upload.
*/
private boolean isMultipartEnabled;

private BlockOutputStreamBuilder() {
}

Expand Down Expand Up @@ -1276,5 +1291,11 @@ public BlockOutputStreamBuilder withIOStatisticsAggregator(
ioStatisticsAggregator = value;
return this;
}

public BlockOutputStreamBuilder withMultipartEnabled(
final boolean value) {
isMultipartEnabled = value;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ protected BlockFactory(S3AFileSystem owner) {
* @param statistics stats to work with
* @return a new block.
*/
abstract DataBlock create(long index, int limit,
abstract DataBlock create(long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException;

Expand Down Expand Up @@ -258,7 +258,7 @@ final DestState getState() {
* Return the current data size.
* @return the size of the data
*/
abstract int dataSize();
abstract long dataSize();

/**
* Predicate to verify that the block has the capacity to write
Expand All @@ -280,7 +280,7 @@ boolean hasData() {
* The remaining capacity in the block before it is full.
* @return the number of bytes remaining.
*/
abstract int remainingCapacity();
abstract long remainingCapacity();

/**
* Write a series of bytes from the buffer, from the offset.
Expand Down Expand Up @@ -391,7 +391,7 @@ static class ArrayBlockFactory extends BlockFactory {
}

@Override
DataBlock create(long index, int limit,
DataBlock create(long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
return new ByteArrayBlock(0, limit, statistics);
Expand Down Expand Up @@ -436,11 +436,11 @@ static class ByteArrayBlock extends DataBlock {
private Integer dataSize;

ByteArrayBlock(long index,
int limit,
long limit,
BlockOutputStreamStatistics statistics) {
super(index, statistics);
this.limit = limit;
buffer = new S3AByteArrayOutputStream(limit);
this.limit = (limit > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) limit;
buffer = new S3AByteArrayOutputStream(this.limit);
blockAllocated();
}

Expand All @@ -449,7 +449,7 @@ static class ByteArrayBlock extends DataBlock {
* @return the amount of data available to upload.
*/
@Override
int dataSize() {
long dataSize() {
return dataSize != null ? dataSize : buffer.size();
}

Expand All @@ -468,14 +468,14 @@ boolean hasCapacity(long bytes) {
}

@Override
int remainingCapacity() {
long remainingCapacity() {
return limit - dataSize();
}

@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
int written = (int) Math.min(remainingCapacity(), len);
buffer.write(b, offset, written);
return written;
}
Expand Down Expand Up @@ -514,7 +514,7 @@ static class ByteBufferBlockFactory extends BlockFactory {
}

@Override
ByteBufferBlock create(long index, int limit,
ByteBufferBlock create(long index, long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
return new ByteBufferBlock(index, limit, statistics);
Expand Down Expand Up @@ -564,11 +564,12 @@ class ByteBufferBlock extends DataBlock {
* @param statistics statistics to update
*/
ByteBufferBlock(long index,
int bufferSize,
long bufferSize,
BlockOutputStreamStatistics statistics) {
super(index, statistics);
this.bufferSize = bufferSize;
blockBuffer = requestBuffer(bufferSize);
this.bufferSize = bufferSize > Integer.MAX_VALUE ?
Integer.MAX_VALUE : (int) bufferSize;
blockBuffer = requestBuffer(this.bufferSize);
blockAllocated();
}

Expand All @@ -577,7 +578,7 @@ class ByteBufferBlock extends DataBlock {
* @return the amount of data available to upload.
*/
@Override
int dataSize() {
long dataSize() {
return dataSize != null ? dataSize : bufferCapacityUsed();
}

Expand All @@ -598,7 +599,7 @@ public boolean hasCapacity(long bytes) {
}

@Override
public int remainingCapacity() {
public long remainingCapacity() {
return blockBuffer != null ? blockBuffer.remaining() : 0;
}

Expand All @@ -609,7 +610,7 @@ private int bufferCapacityUsed() {
@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
int written = (int) Math.min(remainingCapacity(), len);
blockBuffer.put(b, offset, written);
return written;
}
Expand Down Expand Up @@ -809,7 +810,7 @@ static class DiskBlockFactory extends BlockFactory {
*/
@Override
DataBlock create(long index,
int limit,
long limit,
BlockOutputStreamStatistics statistics)
throws IOException {
File destFile = getOwner()
Expand All @@ -825,14 +826,14 @@ DataBlock create(long index,
*/
static class DiskBlock extends DataBlock {

private int bytesWritten;
private long bytesWritten;
private final File bufferFile;
private final int limit;
private final long limit;
private BufferedOutputStream out;
private final AtomicBoolean closed = new AtomicBoolean(false);

DiskBlock(File bufferFile,
int limit,
long limit,
long index,
BlockOutputStreamStatistics statistics)
throws FileNotFoundException {
Expand All @@ -844,7 +845,7 @@ static class DiskBlock extends DataBlock {
}

@Override
int dataSize() {
long dataSize() {
return bytesWritten;
}

Expand All @@ -854,14 +855,14 @@ boolean hasCapacity(long bytes) {
}

@Override
int remainingCapacity() {
long remainingCapacity() {
return limit - bytesWritten;
}

@Override
int write(byte[] b, int offset, int len) throws IOException {
super.write(b, offset, len);
int written = Math.min(remainingCapacity(), len);
int written = (int) Math.min(remainingCapacity(), len);
out.write(b, offset, written);
bytesWritten += written;
return written;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,6 @@ public void initialize(URI name, Configuration originalConf)
}
blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
DEFAULT_FAST_UPLOAD_BUFFER);
partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
blockOutputActiveBlocks = intOption(conf,
FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
Expand Down Expand Up @@ -1831,6 +1830,11 @@ private FSDataOutputStream innerCreateFile(
final PutObjectOptions putOptions =
new PutObjectOptions(keep, null, options.getHeaders());

if(!checkDiskBuffer(getConf())){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.

Copy link
Contributor

@mukund-thakur mukund-thakur Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just add a method validateOutputStreamConfiguration() and throw exception in the implementation only.

This is still pending. I don't really mind leaving it as it is but I think my suggestion is consistent with other parts of the code and is more readable.
CC @steveloughran

throw new IOException("The filesystem conf is not " +
"proper for the output stream");
}

final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
S3ABlockOutputStream.builder()
.withKey(destKey)
Expand All @@ -1854,7 +1858,9 @@ private FSDataOutputStream innerCreateFile(
.withCSEEnabled(isCSEEnabled)
.withPutOptions(putOptions)
.withIOStatisticsAggregator(
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator());
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
.withMultipartEnabled(getConf().getBoolean(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the multipart enabled flag should be made a field and stored during initialize(), so we can save on scanning the conf map every time a file is created.

MULTIPART_UPLOADS_ENABLED, MULTIPART_UPLOAD_ENABLED_DEFAULT));
return new FSDataOutputStream(
new S3ABlockOutputStream(builder),
null);
Expand Down
Loading