Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.7.0-beta.1 (Unreleased)
- Added a maxConcurrency option on ParallelTransferOptions that allows the customer to limit how many concurrent network requests will be outstanding at once.

@kasobol-msft kasobol-msft Jun 1, 2020

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that isn't clear (from the notes) is what's the scope of this limit. I believe it's per upload request by looking at implementation. However, one might conclude it's global limit. We should make this clear.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I'll add that to the javadocs, too

- Fixed a bug that caused auth failures when constructing a client to a secondary endpoint using token auth.

## 12.6.1 (2020-05-06)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ public Mono<Response<BlockBlobItem>> uploadWithResponse(Flux<ByteBuffer> data,
private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockBlobAsyncClient,
Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers,
Map<String, String> metadata, AccessTier tier, BlobRequestConditions requestConditions) {
// TODO: Parallelism parameter? Or let Reactor handle it?
// TODO: Sample/api reference
// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong();
Expand Down Expand Up @@ -392,7 +391,7 @@ private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockB
.map(x -> blockId)
.doFinally(x -> pool.returnBuffer(buffer))
.flux();
}) // TODO: parallelism?
}, parallelTransferOptions.getMaxConcurrency())
.collect(Collectors.toList())
.flatMap(ids ->
blockBlobAsyncClient.commitBlockListWithResponse(ids, headers, metadata, tier, requestConditions));
Expand Down Expand Up @@ -538,7 +537,7 @@ private Mono<Void> uploadFileChunks(long fileSize, ParallelTransferOptions paral

return client.stageBlockWithResponse(blockId, progressData, chunk.getCount(), null,
finalRequestConditions.getLeaseId());
})
}, parallelTransferOptions.getMaxConcurrency())
.then(Mono.defer(() -> client.commitBlockListWithResponse(
new ArrayList<>(blockIds.values()), headers, metadata, tier, finalRequestConditions)))
.then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import reactor.util.concurrent.Queues;
Comment thread
alzimmermsft marked this conversation as resolved.
Outdated

import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -44,7 +45,9 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
: other.getNumBuffers(),
other.getProgressReceiver(),
other.getMaxSingleUploadSize() == null ? Integer.valueOf(BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES)
: other.getMaxSingleUploadSize());
: other.getMaxSingleUploadSize(),
// Queues.SMALL_BUFFER_SIZE is the default used by reactor
other.getMaxConcurrency() == null ? Integer.valueOf(Queues.SMALL_BUFFER_SIZE) : other.getMaxConcurrency());
Comment thread
alzimmermsft marked this conversation as resolved.
Outdated
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public final class ParallelTransferOptions {
private final Integer numBuffers;
private final ProgressReceiver progressReceiver;
private final Integer maxSingleUploadSize;
private final Integer maxConcurrency;

/**
* Creates a new {@link ParallelTransferOptions} with default parameters applied.
Expand Down Expand Up @@ -62,7 +63,35 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe
* {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}.
*/
public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver,
Integer maxSingleUploadSize) {
Integer maxSingleUploadSize) {
this(blockSize, numBuffers, progressReceiver, maxSingleUploadSize, null);
}

/**
* Creates a new {@link ParallelTransferOptions} with default parameters applied.
*
* @param blockSize The block size.
* For upload, The block size is the size of each block that will be staged. This value also determines the number
* of requests that need to be made. If block size is large, upload will make fewer network calls, but each
* individual call will send more data and will therefore take longer. This parameter also determines the size
* that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may
* be up to blockSize * numBuffers.
* @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method
* should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the
* number of buffers, the more parallel, and thus faster, the upload portion of this operation will be.
* The amount of memory consumed by methods using this value may be up to blockSize * numBuffers.
* @param progressReceiver {@link ProgressReceiver}
* @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a
* single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be
* ignored. Some constraints to consider are that more requests cost more, but several small or mid-sized requests
* may sometimes perform better. In the case of buffered upload, up to this amount of data may be buffered before
* any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the
* service accepts for uploading in a single requests and is represented by
* {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}.
* @param maxConcurrency The maximum number of parallel requests that will be issued at any given time.
Comment thread
anuchandy marked this conversation as resolved.
Outdated
*/
public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver,
Integer maxSingleUploadSize, Integer maxConcurrency) {
if (blockSize != null) {
StorageImplUtils.assertInBounds("blockSize", blockSize, 1, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES);
}
Expand All @@ -79,6 +108,8 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe
BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES);
}
this.maxSingleUploadSize = maxSingleUploadSize;

this.maxConcurrency = maxConcurrency;
}

/**
Expand Down Expand Up @@ -112,4 +143,12 @@ public ProgressReceiver getProgressReceiver() {
public Integer getMaxSingleUploadSize() {
return this.maxSingleUploadSize;
}

/**
* Gets the maximum number of parallel requests that will be issued at any given time.
* @return The max concurrency value.
*/
public Integer getMaxConcurrency() {
return this.maxConcurrency;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ private Mono<Response<BlobProperties>> downloadToFileImpl(AsynchronousFileChanne
.flatMap(response ->
writeBodyToFile(response, file, chunkNum, finalParallelTransferOptions, progressLock,
totalProgress));
})
}, finalParallelTransferOptions.getMaxConcurrency())
// Only the first download call returns a value.
.then(Mono.just(buildBlobPropertiesResponse(initialResponse)));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.polling.SyncPoller;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.BlobServiceVersion;
Expand Down Expand Up @@ -728,8 +727,8 @@ public Response<Void> setMetadataWithResponse(Map<String, String> metadata, Blob
* <p>For more information, see the
* <a href="https://docs.microsoft.com/en-us/rest/api/storageservices/snapshot-blob">Azure Docs</a></p>
*
* @return A response containing a {@link BlobClient} which is used to interact with the created snapshot, use
* {@link BlobClient#getSnapshotId()} to get the identifier for the snapshot.
* @return A response containing a {@link BlobClientBase} which is used to interact with the created snapshot, use
* {@link BlobClientBase#getSnapshotId()} to get the identifier for the snapshot.
*/
public BlobClientBase createSnapshot() {
return createSnapshotWithResponse(null, null, null, Context.NONE).getValue();
Expand All @@ -750,8 +749,8 @@ public BlobClientBase createSnapshot() {
* @param requestConditions {@link BlobRequestConditions}
* @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A response containing a {@link BlobClient} which is used to interact with the created snapshot, use
* {@link BlobClient#getSnapshotId()} to get the identifier for the snapshot.
* @return A response containing a {@link BlobClientBase} which is used to interact with the created snapshot, use
* {@link BlobClientBase#getSnapshotId()} to get the identifier for the snapshot.
*/
public Response<BlobClientBase> createSnapshotWithResponse(Map<String, String> metadata,
BlobRequestConditions requestConditions, Duration timeout, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public final class ParallelTransferOptions {
private final Integer numBuffers;
private final ProgressReceiver progressReceiver;
private final Integer maxSingleUploadSize;
private final Integer maxConcurrency;

/**
* Creates a new {@link ParallelTransferOptions} with default parameters applied.
Expand All @@ -42,13 +43,42 @@ public final class ParallelTransferOptions {
*/
public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver,
Integer maxSingleUploadSize) {
this(blockSize, numBuffers, progressReceiver, maxSingleUploadSize, null);
}

/**
* Creates a new {@link ParallelTransferOptions} with default parameters applied.
*
* @param blockSize The block size.
* For upload, The block size is the size of each block that will be staged. This value also determines the number
* of requests that need to be made. This parameter also determines the size that each buffer uses when buffering
* is required and consequently amount of memory consumed by such methods may be up to blockSize * numBuffers.
* For download to file, the block size is the size of each data chunk returned from the service.
* For both applications, If block size is large, upload will make fewer network calls, but each
* individual call will send more data and will therefore take longer.
* @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method
* should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the
* number of buffers, the more parallel, and thus faster, the upload portion of this operation will be.
* The amount of memory consumed by methods using this value may be up to blockSize * numBuffers.
* @param progressReceiver {@link ProgressReceiver}
* @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a
* single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be
* ignored. Some constraints to consider are that more requests cost more, but several small or mid-sized requests
* may sometimes perform better. In the case of buffered upload, up to this amount of data may be buffered before
* any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the
* service accepts for uploading in a single requests, which varies depending on the service.
* @param maxConcurrency The maximum number of parallel requests that will be issued at any given time.
*/
public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver,
Integer maxSingleUploadSize, Integer maxConcurrency) {
this.blockSize = blockSize;
if (numBuffers != null) {
StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE);
}
this.numBuffers = numBuffers;
this.progressReceiver = progressReceiver;
this.maxSingleUploadSize = maxSingleUploadSize;
this.maxConcurrency = maxConcurrency;
}

/**
Expand Down Expand Up @@ -82,4 +112,12 @@ public ProgressReceiver getProgressReceiver() {
public Integer getMaxSingleUploadSize() {
return this.maxSingleUploadSize;
}

/**
* Gets the maximum number of parallel requests that will be issued at any given time.
* @return The max concurrency value.
*/
public Integer getMaxConcurrency() {
return this.maxConcurrency;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private Mono<Response<PathInfo>> uploadInChunks(Flux<ByteBuffer> data, long file
.doFinally(x -> pool.returnBuffer(buffer))
.map(resp -> currentBufferLength + currentOffset) /* End of file after append to pass to flush. */
.flux();
})
}, parallelTransferOptions.getMaxConcurrency())
.last()
.flatMap(length -> flushWithResponse(length, false, false, httpHeaders, requestConditions));
}
Expand Down Expand Up @@ -487,7 +487,7 @@ private Mono<Void> uploadFileChunks(long fileOffset, long fileSize, ParallelTran

return appendWithResponse(progressData, fileOffset + chunk.getOffset(), chunk.getCount(), null,
requestConditions.getLeaseId());
})
}, parallelTransferOptions.getMaxConcurrency())
.then(Mono.defer(() ->
flushWithResponse(fileSize, false, false, headers, requestConditions)))
.then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.storage.common.ParallelTransferOptions;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.StorageImplUtils;
import reactor.util.concurrent.Queues;
Comment thread
alzimmermsft marked this conversation as resolved.
Outdated

/**
* This class provides helper methods for common model patterns.
Expand Down Expand Up @@ -57,6 +58,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
: other.getNumBuffers(),
other.getProgressReceiver(),
other.getMaxSingleUploadSize() == null ? Integer.valueOf(MAX_APPEND_FILE_BYTES)
: other.getMaxSingleUploadSize());
: other.getMaxSingleUploadSize(),
// Queues.SMALL_BUFFER_SIZE is the default used by reactor
other.getMaxConcurrency() == null ? Integer.valueOf(Queues.SMALL_BUFFER_SIZE) : other.getMaxConcurrency());
Comment thread
alzimmermsft marked this conversation as resolved.
Outdated
}
}