From 6d0162a2eda29f3daaa599fda3529b2c427f3a6a Mon Sep 17 00:00:00 2001 From: Rick Ley Date: Wed, 20 May 2020 11:37:28 -0700 Subject: [PATCH 1/8] Barely started --- .../blob/models/ParallelTransferOptions.java | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java index db293a76cb7a..67ca9354be18 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java @@ -19,6 +19,7 @@ public final class ParallelTransferOptions { private final Integer numBuffers; private final ProgressReceiver progressReceiver; private final Integer maxSingleUploadSize; + private final Integer maxConcurrency; /** * Creates a new {@link ParallelTransferOptions} with default parameters applied. @@ -62,7 +63,35 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe * {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}. */ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, - Integer maxSingleUploadSize) { + Integer maxSingleUploadSize) { + this(blockSize, numBuffers, progressReceiver, maxSingleUploadSize, null); + } + + /** + * Creates a new {@link ParallelTransferOptions} with default parameters applied. + * + * @param blockSize The block size. + * For upload, The block size is the size of each block that will be staged. This value also determines the number + * of requests that need to be made. If block size is large, upload will make fewer network calls, but each + * individual call will send more data and will therefore take longer. This parameter also determines the size + * that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may + * be up to blockSize * numBuffers. + * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method + * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the + * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. + * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. + * @param progressReceiver {@link ProgressReceiver} + * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a + * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be + * ignored. Some constraints to consider are that more requests cost more, but several small or mid-sized requests + * may sometimes perform better. In the case of buffered upload, up to this amount of data may be buffered before + * any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the + * service accepts for uploading in a single requests and is represented by + * {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}. + * @param maxConcurrency The maximum number of parallel requests + */ + public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, + Integer maxSingleUploadSize, Integer maxConcurrency) { if (blockSize != null) { StorageImplUtils.assertInBounds("blockSize", blockSize, 1, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES); } From f44b9601af8d8745c692c29d7434f0136bd6ae6f Mon Sep 17 00:00:00 2001 From: Rick Ley Date: Wed, 27 May 2020 10:06:13 -0700 Subject: [PATCH 2/8] Added max concurrency parameter --- sdk/storage/azure-storage-blob/CHANGELOG.md | 1 + .../azure/storage/blob/BlobAsyncClient.java | 5 +-- .../blob/implementation/util/ModelHelper.java | 5 ++- .../blob/models/ParallelTransferOptions.java | 12 +++++- .../blob/specialized/BlobAsyncClientBase.java | 2 +- .../blob/specialized/BlobClientBase.java | 9 ++--- .../common/ParallelTransferOptions.java | 38 +++++++++++++++++++ .../datalake/DataLakeFileAsyncClient.java | 4 +- .../implementation/util/ModelHelper.java | 5 ++- 9 files changed, 67 insertions(+), 14 deletions(-) diff --git a/sdk/storage/azure-storage-blob/CHANGELOG.md b/sdk/storage/azure-storage-blob/CHANGELOG.md index 548b137d4b19..66d9d376a224 100644 --- a/sdk/storage/azure-storage-blob/CHANGELOG.md +++ b/sdk/storage/azure-storage-blob/CHANGELOG.md @@ -1,6 +1,7 @@ # Release History ## 12.7.0-beta.1 (Unreleased) +- Added a maxConcurrency option on ParallelTransferOptions that allows the customer to limit how many concurrent network requests will be outstanding at once. - Fixed a bug that caused auth failures when constructing a client to a secondary endpoint using token auth. ## 12.6.1 (2020-05-06) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java index 83f6c1b7ecd4..67417122904a 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java @@ -359,7 +359,6 @@ public Mono> uploadWithResponse(Flux data, private Mono> uploadInChunks(BlockBlobAsyncClient blockBlobAsyncClient, Flux data, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers, Map metadata, AccessTier tier, BlobRequestConditions requestConditions) { - // TODO: Parallelism parameter? Or let Reactor handle it? // TODO: Sample/api reference // See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong. AtomicLong totalProgress = new AtomicLong(); @@ -392,7 +391,7 @@ private Mono> uploadInChunks(BlockBlobAsyncClient blockB .map(x -> blockId) .doFinally(x -> pool.returnBuffer(buffer)) .flux(); - }) // TODO: parallelism? + }, parallelTransferOptions.getMaxConcurrency()) .collect(Collectors.toList()) .flatMap(ids -> blockBlobAsyncClient.commitBlockListWithResponse(ids, headers, metadata, tier, requestConditions)); @@ -538,7 +537,7 @@ private Mono uploadFileChunks(long fileSize, ParallelTransferOptions paral return client.stageBlockWithResponse(blockId, progressData, chunk.getCount(), null, finalRequestConditions.getLeaseId()); - }) + }, parallelTransferOptions.getMaxConcurrency()) .then(Mono.defer(() -> client.commitBlockListWithResponse( new ArrayList<>(blockIds.values()), headers, metadata, tier, finalRequestConditions))) .then(); diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java index 5d50a388712d..78e3fa027dde 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java @@ -6,6 +6,7 @@ import com.azure.storage.blob.BlobAsyncClient; import com.azure.storage.blob.models.ParallelTransferOptions; import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import reactor.util.concurrent.Queues; import java.net.MalformedURLException; import java.net.URL; @@ -44,7 +45,9 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO : other.getNumBuffers(), other.getProgressReceiver(), other.getMaxSingleUploadSize() == null ? Integer.valueOf(BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES) - : other.getMaxSingleUploadSize()); + : other.getMaxSingleUploadSize(), + // Queues.SMALL_BUFFER_SIZE is the default used by reactor + other.getMaxConcurrency() == null ? Queues.SMALL_BUFFER_SIZE : other.getMaxConcurrency()); } /** diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java index 67ca9354be18..7335a340008c 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java @@ -88,7 +88,7 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe * any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the * service accepts for uploading in a single requests and is represented by * {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}. - * @param maxConcurrency The maximum number of parallel requests + * @param maxConcurrency The maximum number of parallel requests that will be issued at any given time. */ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, Integer maxSingleUploadSize, Integer maxConcurrency) { @@ -108,6 +108,8 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES); } this.maxSingleUploadSize = maxSingleUploadSize; + + this.maxConcurrency = maxConcurrency; } /** @@ -141,4 +143,12 @@ public ProgressReceiver getProgressReceiver() { public Integer getMaxSingleUploadSize() { return this.maxSingleUploadSize; } + + /** + * Gets the maximum number of parallel requests that will be issued at any given time. + * @return The max concurrency value. + */ + public Integer getMaxConcurrency() { + return this.maxConcurrency; + } } diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java index cfa417c04718..fff547695a77 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java @@ -897,7 +897,7 @@ private Mono> downloadToFileImpl(AsynchronousFileChanne .flatMap(response -> writeBodyToFile(response, file, chunkNum, finalParallelTransferOptions, progressLock, totalProgress)); - }) + }, finalParallelTransferOptions.getMaxConcurrency()) // Only the first download call returns a value. .then(Mono.just(buildBlobPropertiesResponse(initialResponse))); }); diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java index 850ec4be8824..a15035981d1c 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java @@ -11,7 +11,6 @@ import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; import com.azure.core.util.polling.SyncPoller; -import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.models.BlobProperties; import com.azure.storage.blob.BlobServiceVersion; @@ -728,8 +727,8 @@ public Response setMetadataWithResponse(Map metadata, Blob *

For more information, see the * Azure Docs

* - * @return A response containing a {@link BlobClient} which is used to interact with the created snapshot, use - * {@link BlobClient#getSnapshotId()} to get the identifier for the snapshot. + * @return A response containing a {@link BlobClientBase} which is used to interact with the created snapshot, use + * {@link BlobClientBase#getSnapshotId()} to get the identifier for the snapshot. */ public BlobClientBase createSnapshot() { return createSnapshotWithResponse(null, null, null, Context.NONE).getValue(); @@ -750,8 +749,8 @@ public BlobClientBase createSnapshot() { * @param requestConditions {@link BlobRequestConditions} * @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised. * @param context Additional context that is passed through the Http pipeline during the service call. - * @return A response containing a {@link BlobClient} which is used to interact with the created snapshot, use - * {@link BlobClient#getSnapshotId()} to get the identifier for the snapshot. + * @return A response containing a {@link BlobClientBase} which is used to interact with the created snapshot, use + * {@link BlobClientBase#getSnapshotId()} to get the identifier for the snapshot. */ public Response createSnapshotWithResponse(Map metadata, BlobRequestConditions requestConditions, Duration timeout, Context context) { diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java index e20356d8b1ac..4d88f4256eab 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java @@ -17,6 +17,7 @@ public final class ParallelTransferOptions { private final Integer numBuffers; private final ProgressReceiver progressReceiver; private final Integer maxSingleUploadSize; + private final Integer maxConcurrency; /** * Creates a new {@link ParallelTransferOptions} with default parameters applied. @@ -42,6 +43,34 @@ public final class ParallelTransferOptions { */ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, Integer maxSingleUploadSize) { + this(blockSize, numBuffers, progressReceiver, maxSingleUploadSize, null); + } + + /** + * Creates a new {@link ParallelTransferOptions} with default parameters applied. + * + * @param blockSize The block size. + * For upload, The block size is the size of each block that will be staged. This value also determines the number + * of requests that need to be made. This parameter also determines the size that each buffer uses when buffering + * is required and consequently amount of memory consumed by such methods may be up to blockSize * numBuffers. + * For download to file, the block size is the size of each data chunk returned from the service. + * For both applications, If block size is large, upload will make fewer network calls, but each + * individual call will send more data and will therefore take longer. + * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method + * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the + * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. + * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. + * @param progressReceiver {@link ProgressReceiver} + * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a + * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be + * ignored. Some constraints to consider are that more requests cost more, but several small or mid-sized requests + * may sometimes perform better. In the case of buffered upload, up to this amount of data may be buffered before + * any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the + * service accepts for uploading in a single requests, which varies depending on the service. + * @param maxConcurrency The maximum number of parallel requests that will be issued at any given time. + */ + public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, + Integer maxSingleUploadSize, Integer maxConcurrency) { this.blockSize = blockSize; if (numBuffers != null) { StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE); @@ -49,6 +78,7 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe this.numBuffers = numBuffers; this.progressReceiver = progressReceiver; this.maxSingleUploadSize = maxSingleUploadSize; + this.maxConcurrency = maxConcurrency; } /** @@ -82,4 +112,12 @@ public ProgressReceiver getProgressReceiver() { public Integer getMaxSingleUploadSize() { return this.maxSingleUploadSize; } + + /** + * Gets the maximum number of parallel requests that will be issued at any given time. + * @return The max concurrency value. + */ + public Integer getMaxConcurrency() { + return this.maxConcurrency; + } } diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java index 02b8ed8ffa13..75069046fb8c 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java @@ -334,7 +334,7 @@ private Mono> uploadInChunks(Flux data, long file .doFinally(x -> pool.returnBuffer(buffer)) .map(resp -> currentBufferLength + currentOffset) /* End of file after append to pass to flush. */ .flux(); - }) + }, parallelTransferOptions.getMaxConcurrency()) .last() .flatMap(length -> flushWithResponse(length, false, false, httpHeaders, requestConditions)); } @@ -487,7 +487,7 @@ private Mono uploadFileChunks(long fileOffset, long fileSize, ParallelTran return appendWithResponse(progressData, fileOffset + chunk.getOffset(), chunk.getCount(), null, requestConditions.getLeaseId()); - }) + }, parallelTransferOptions.getMaxConcurrency()) .then(Mono.defer(() -> flushWithResponse(fileSize, false, false, headers, requestConditions))) .then(); diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java index a25b6fa17ca0..a56b0773cd81 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java @@ -6,6 +6,7 @@ import com.azure.storage.common.ParallelTransferOptions; import com.azure.storage.common.implementation.Constants; import com.azure.storage.common.implementation.StorageImplUtils; +import reactor.util.concurrent.Queues; /** * This class provides helper methods for common model patterns. @@ -57,6 +58,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO : other.getNumBuffers(), other.getProgressReceiver(), other.getMaxSingleUploadSize() == null ? Integer.valueOf(MAX_APPEND_FILE_BYTES) - : other.getMaxSingleUploadSize()); + : other.getMaxSingleUploadSize(), + // Queues.SMALL_BUFFER_SIZE is the default used by reactor + other.getMaxConcurrency() == null ? Queues.SMALL_BUFFER_SIZE : other.getMaxConcurrency()); } } From a1916abb161d9304c89075d5a14b239d9173df31 Mon Sep 17 00:00:00 2001 From: Rick Ley Date: Wed, 27 May 2020 12:32:59 -0700 Subject: [PATCH 3/8] spotbugs fix --- .../com/azure/storage/blob/implementation/util/ModelHelper.java | 2 +- .../storage/file/datalake/implementation/util/ModelHelper.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java index 78e3fa027dde..e903fd0d12fa 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java @@ -47,7 +47,7 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO other.getMaxSingleUploadSize() == null ? Integer.valueOf(BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES) : other.getMaxSingleUploadSize(), // Queues.SMALL_BUFFER_SIZE is the default used by reactor - other.getMaxConcurrency() == null ? Queues.SMALL_BUFFER_SIZE : other.getMaxConcurrency()); + other.getMaxConcurrency() == null ? Integer.valueOf(Queues.SMALL_BUFFER_SIZE) : other.getMaxConcurrency()); } /** diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java index a56b0773cd81..73a8b1da8691 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java @@ -60,6 +60,6 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO other.getMaxSingleUploadSize() == null ? Integer.valueOf(MAX_APPEND_FILE_BYTES) : other.getMaxSingleUploadSize(), // Queues.SMALL_BUFFER_SIZE is the default used by reactor - other.getMaxConcurrency() == null ? Queues.SMALL_BUFFER_SIZE : other.getMaxConcurrency()); + other.getMaxConcurrency() == null ? Integer.valueOf(Queues.SMALL_BUFFER_SIZE) : other.getMaxConcurrency()); } } From 2be2aec1a84f126de528c0afd4e7d2b571a3829d Mon Sep 17 00:00:00 2001 From: Rick Ley Date: Tue, 2 Jun 2020 12:24:59 -0700 Subject: [PATCH 4/8] Merged maxConcurrency and numBuffers --- sdk/storage/azure-storage-blob/CHANGELOG.md | 2 +- .../azure/storage/blob/BlobAsyncClient.java | 2 +- .../blob/implementation/util/ModelHelper.java | 10 ++- .../blob/models/ParallelTransferOptions.java | 71 +++++++------------ .../common/ParallelTransferOptions.java | 59 ++++++--------- .../datalake/DataLakeFileAsyncClient.java | 2 +- .../storage/file/datalake/Transforms.java | 2 +- .../implementation/util/ModelHelper.java | 8 +-- 8 files changed, 56 insertions(+), 100 deletions(-) diff --git a/sdk/storage/azure-storage-blob/CHANGELOG.md b/sdk/storage/azure-storage-blob/CHANGELOG.md index 66d9d376a224..0331b824eb16 100644 --- a/sdk/storage/azure-storage-blob/CHANGELOG.md +++ b/sdk/storage/azure-storage-blob/CHANGELOG.md @@ -1,7 +1,7 @@ # Release History ## 12.7.0-beta.1 (Unreleased) -- Added a maxConcurrency option on ParallelTransferOptions that allows the customer to limit how many concurrent network requests will be outstanding at once. +- Added a maxConcurrency option on ParallelTransferOptions that allows the customer to limit how many concurrent network requests will be outstanding per api request at once. - Fixed a bug that caused auth failures when constructing a client to a secondary endpoint using token auth. ## 12.6.1 (2020-05-06) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java index 67417122904a..4c4d331bafbc 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java @@ -365,7 +365,7 @@ private Mono> uploadInChunks(BlockBlobAsyncClient blockB Lock progressLock = new ReentrantLock(); // Validation done in the constructor. - UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getNumBuffers(), + UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency(), parallelTransferOptions.getBlockSize(), BlockBlobClient.MAX_STAGE_BLOCK_BYTES); Flux chunkedSource = UploadUtils.chunkSource(data, diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java index e903fd0d12fa..63e903432168 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java @@ -41,13 +41,11 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO return new ParallelTransferOptions( other.getBlockSize() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE) : other.getBlockSize(), - other.getNumBuffers() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS) - : other.getNumBuffers(), + other.getMaxConcurrency() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS) + : other.getMaxConcurrency(), other.getProgressReceiver(), other.getMaxSingleUploadSize() == null ? Integer.valueOf(BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES) - : other.getMaxSingleUploadSize(), - // Queues.SMALL_BUFFER_SIZE is the default used by reactor - other.getMaxConcurrency() == null ? Integer.valueOf(Queues.SMALL_BUFFER_SIZE) : other.getMaxConcurrency()); + : other.getMaxSingleUploadSize()); } /** @@ -58,7 +56,7 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO public static com.azure.storage.common.ParallelTransferOptions wrapBlobOptions( ParallelTransferOptions blobOptions) { Integer blockSize = blobOptions.getBlockSize(); - Integer numBuffers = blobOptions.getNumBuffers(); + Integer numBuffers = blobOptions.getMaxConcurrency(); com.azure.storage.common.ProgressReceiver wrappedReceiver = blobOptions.getProgressReceiver() == null ? null : blobOptions.getProgressReceiver()::reportProgress; diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java index 7335a340008c..3474a5b91b15 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java @@ -16,10 +16,9 @@ public final class ParallelTransferOptions { private final Integer blockSize; - private final Integer numBuffers; + private final Integer maxConcurrency; private final ProgressReceiver progressReceiver; private final Integer maxSingleUploadSize; - private final Integer maxConcurrency; /** * Creates a new {@link ParallelTransferOptions} with default parameters applied. @@ -30,14 +29,14 @@ public final class ParallelTransferOptions { * individual call will send more data and will therefore take longer. This parameter also determines the size * that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may * be up to blockSize * numBuffers. - * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method - * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the + * @param maxConcurrency For buffered upload only, the number of buffers is the maximum number of buffers this + * method should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. * @param progressReceiver {@link ProgressReceiver} */ - public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver) { - this(blockSize, numBuffers, progressReceiver, null); + public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, ProgressReceiver progressReceiver) { + this(blockSize, maxConcurrency, progressReceiver, null); } /** @@ -48,11 +47,15 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe * of requests that need to be made. If block size is large, upload will make fewer network calls, but each * individual call will send more data and will therefore take longer. This parameter also determines the size * that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may - * be up to blockSize * numBuffers. - * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method - * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the - * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. - * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. + * be up to blockSize * maxConcurrency. + * @param maxConcurrency The maximum number of parallel requests that will be issued at any given time as a part of + * a single parallel transfer. This value applies per api. For example, if two calls to uploadFromFile are made at + * the same time, and each specifies a maxConcurrency of 5, there may be up to 10 outstanding, concurrent requests, + * up to 5 for each of the upload operations. For buffered uploads only, this value is the maximum number of buffers + * to be allocated as part of the transfer. In those cases, memory will be allocated lazily as needed and this value + * must be at least two. The amount of memory consumed by methods which buffer may be up to + * blockSize * maxConcurrency. In general, upload methods which do not accept a length parameter must perform some + * buffering. * @param progressReceiver {@link ProgressReceiver} * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be @@ -62,45 +65,22 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe * service accepts for uploading in a single requests and is represented by * {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}. */ - public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, + public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, ProgressReceiver progressReceiver, Integer maxSingleUploadSize) { - this(blockSize, numBuffers, progressReceiver, maxSingleUploadSize, null); - } - - /** - * Creates a new {@link ParallelTransferOptions} with default parameters applied. - * - * @param blockSize The block size. - * For upload, The block size is the size of each block that will be staged. This value also determines the number - * of requests that need to be made. If block size is large, upload will make fewer network calls, but each - * individual call will send more data and will therefore take longer. This parameter also determines the size - * that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may - * be up to blockSize * numBuffers. - * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method - * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the - * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. - * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. - * @param progressReceiver {@link ProgressReceiver} - * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a - * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be - * ignored. Some constraints to consider are that more requests cost more, but several small or mid-sized requests - * may sometimes perform better. In the case of buffered upload, up to this amount of data may be buffered before - * any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the - * service accepts for uploading in a single requests and is represented by - * {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}. - * @param maxConcurrency The maximum number of parallel requests that will be issued at any given time. - */ - public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, - Integer maxSingleUploadSize, Integer maxConcurrency) { if (blockSize != null) { StorageImplUtils.assertInBounds("blockSize", blockSize, 1, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES); } this.blockSize = blockSize; - if (numBuffers != null) { - StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE); + /* + In buffered upload cases, maxConcurrency must be at least 2 as it also indicates the number of buffers we will + allocate. That minimum is validated in the UploadBufferPool. Because this value is also used in file transfers + which do not buffer, we only check the absolute minimum for this value here. + */ + if (maxConcurrency != null) { + StorageImplUtils.assertInBounds("maxConcurrency", maxConcurrency, 1, Integer.MAX_VALUE); } - this.numBuffers = numBuffers; + this.maxConcurrency = maxConcurrency; this.progressReceiver = progressReceiver; if (maxSingleUploadSize != null) { @@ -108,8 +88,6 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES); } this.maxSingleUploadSize = maxSingleUploadSize; - - this.maxConcurrency = maxConcurrency; } /** @@ -123,9 +101,10 @@ public Integer getBlockSize() { /** * Gets the number of buffers being used for a transfer operation. * @return The number of buffers. + * @deprecated Use {@link #getMaxConcurrency()} */ public Integer getNumBuffers() { - return this.numBuffers; + return this.maxConcurrency; } /** diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java index 4d88f4256eab..ff9f1b43d8a0 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java @@ -14,10 +14,9 @@ public final class ParallelTransferOptions { private final Integer blockSize; - private final Integer numBuffers; + private final Integer maxConcurrency; private final ProgressReceiver progressReceiver; private final Integer maxSingleUploadSize; - private final Integer maxConcurrency; /** * Creates a new {@link ParallelTransferOptions} with default parameters applied. @@ -29,10 +28,14 @@ public final class ParallelTransferOptions { * For download to file, the block size is the size of each data chunk returned from the service. * For both applications, If block size is large, upload will make fewer network calls, but each * individual call will send more data and will therefore take longer. - * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method - * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the - * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. - * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. + * @param maxConcurrency The maximum number of parallel requests that will be issued at any given time as a part of + * a single parallel transfer. This value applies per api. For example, if two calls to uploadFromFile are made at + * the same time, and each specifies a maxConcurrency of 5, there may be up to 10 outstanding, concurrent requests, + * up to 5 for each of the upload operations. For buffered uploads only, this value is the maximum number of buffers + * to be allocated as part of the transfer. In those cases, memory will be allocated lazily as needed and this value + * must be at least two. The amount of memory consumed by methods which buffer may be up to + * blockSize * maxConcurrency. In general, upload methods which do not accept a length parameter must perform some + * buffering. * @param progressReceiver {@link ProgressReceiver} * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be @@ -41,44 +44,21 @@ public final class ParallelTransferOptions { * any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the * service accepts for uploading in a single requests, which varies depending on the service. */ - public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, + public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, ProgressReceiver progressReceiver, Integer maxSingleUploadSize) { - this(blockSize, numBuffers, progressReceiver, maxSingleUploadSize, null); - } - /** - * Creates a new {@link ParallelTransferOptions} with default parameters applied. - * - * @param blockSize The block size. - * For upload, The block size is the size of each block that will be staged. This value also determines the number - * of requests that need to be made. This parameter also determines the size that each buffer uses when buffering - * is required and consequently amount of memory consumed by such methods may be up to blockSize * numBuffers. - * For download to file, the block size is the size of each data chunk returned from the service. - * For both applications, If block size is large, upload will make fewer network calls, but each - * individual call will send more data and will therefore take longer. - * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method - * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the - * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. - * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. - * @param progressReceiver {@link ProgressReceiver} - * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a - * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be - * ignored. Some constraints to consider are that more requests cost more, but several small or mid-sized requests - * may sometimes perform better. In the case of buffered upload, up to this amount of data may be buffered before - * any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the - * service accepts for uploading in a single requests, which varies depending on the service. - * @param maxConcurrency The maximum number of parallel requests that will be issued at any given time. - */ - public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, - Integer maxSingleUploadSize, Integer maxConcurrency) { this.blockSize = blockSize; - if (numBuffers != null) { - StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE); + /* + In buffered upload cases, maxConcurrency must be at least 2 as it also indicates the number of buffers we will + allocate. That minimum is validated in the UploadBufferPool. Because this value is also used in file transfers + which do not buffer, we only check the absolute minimum for this value here. + */ + this.maxConcurrency = maxConcurrency; + if (maxConcurrency != null) { + StorageImplUtils.assertInBounds("numBuffers", maxConcurrency, 1, Integer.MAX_VALUE); } - this.numBuffers = numBuffers; this.progressReceiver = progressReceiver; this.maxSingleUploadSize = maxSingleUploadSize; - this.maxConcurrency = maxConcurrency; } /** @@ -92,9 +72,10 @@ public Integer getBlockSize() { /** * Gets the number of buffers being used for a transfer operation. * @return The number of buffers. + * @deprecated Use {@link #getMaxConcurrency()} */ public Integer getNumBuffers() { - return this.numBuffers; + return this.maxConcurrency; } /** diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java index 75069046fb8c..2feafef089bb 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java @@ -295,7 +295,7 @@ private Mono> uploadInChunks(Flux data, long file Lock progressLock = new ReentrantLock(); // Validation done in the constructor. - UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getNumBuffers(), + UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency(), parallelTransferOptions.getBlockSize(), MAX_APPEND_FILE_BYTES); Flux chunkedSource = UploadUtils.chunkSource(data, parallelTransferOptions); diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/Transforms.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/Transforms.java index 55d10747c7cf..a36f8ddab562 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/Transforms.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/Transforms.java @@ -440,7 +440,7 @@ static com.azure.storage.blob.models.ParallelTransferOptions toBlobParallelTrans if (pto == null) { return null; } - return new com.azure.storage.blob.models.ParallelTransferOptions(pto.getBlockSize(), pto.getNumBuffers(), + return new com.azure.storage.blob.models.ParallelTransferOptions(pto.getBlockSize(), pto.getMaxConcurrency(), Transforms.toBlobProgressReceiver(pto.getProgressReceiver()), pto.getMaxSingleUploadSize()); } diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java index 73a8b1da8691..fb999ca8977a 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java @@ -54,12 +54,10 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO return new ParallelTransferOptions( other.getBlockSize() == null ? Integer.valueOf(FILE_DEFAULT_UPLOAD_BLOCK_SIZE) : other.getBlockSize(), - other.getNumBuffers() == null ? Integer.valueOf(FILE_DEFAULT_NUMBER_OF_BUFFERS) - : other.getNumBuffers(), + other.getMaxConcurrency() == null ? Integer.valueOf(FILE_DEFAULT_NUMBER_OF_BUFFERS) + : other.getMaxConcurrency(), other.getProgressReceiver(), other.getMaxSingleUploadSize() == null ? Integer.valueOf(MAX_APPEND_FILE_BYTES) - : other.getMaxSingleUploadSize(), - // Queues.SMALL_BUFFER_SIZE is the default used by reactor - other.getMaxConcurrency() == null ? Integer.valueOf(Queues.SMALL_BUFFER_SIZE) : other.getMaxConcurrency()); + : other.getMaxSingleUploadSize()); } } From d75791db14e45ffa2ca8cfbf05ab0f7e0c9b7c3c Mon Sep 17 00:00:00 2001 From: Rick Ley Date: Tue, 2 Jun 2020 14:58:03 -0700 Subject: [PATCH 5/8] PR feedback --- .../storage/blob/implementation/util/ModelHelper.java | 1 - .../storage/blob/models/ParallelTransferOptions.java | 9 ++++----- .../azure/storage/common/ParallelTransferOptions.java | 11 +++++------ .../datalake/implementation/util/ModelHelper.java | 1 - 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java index 63e903432168..135b0f6f293c 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java @@ -6,7 +6,6 @@ import com.azure.storage.blob.BlobAsyncClient; import com.azure.storage.blob.models.ParallelTransferOptions; import com.azure.storage.blob.specialized.BlockBlobAsyncClient; -import reactor.util.concurrent.Queues; import java.net.MalformedURLException; import java.net.URL; diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java index 3474a5b91b15..6b4e969c4a97 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java @@ -51,11 +51,10 @@ public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, Progre * @param maxConcurrency The maximum number of parallel requests that will be issued at any given time as a part of * a single parallel transfer. This value applies per api. For example, if two calls to uploadFromFile are made at * the same time, and each specifies a maxConcurrency of 5, there may be up to 10 outstanding, concurrent requests, - * up to 5 for each of the upload operations. For buffered uploads only, this value is the maximum number of buffers - * to be allocated as part of the transfer. In those cases, memory will be allocated lazily as needed and this value - * must be at least two. The amount of memory consumed by methods which buffer may be up to - * blockSize * maxConcurrency. In general, upload methods which do not accept a length parameter must perform some - * buffering. + * up to 5 for each of the upload operations. For buffered uploads only, the maximum number of buffers to be + * allocated as part of the transfer will be {@code maxConcurrency + 1}. In those cases, memory will be allocated + * lazily as needed. The amount of memory consumed by methods which buffer may be up to blockSize * maxConcurrency. + * In general, upload methods which do not accept a length parameter must perform some buffering. * @param progressReceiver {@link ProgressReceiver} * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java index ff9f1b43d8a0..1be97c210861 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java @@ -31,11 +31,10 @@ public final class ParallelTransferOptions { * @param maxConcurrency The maximum number of parallel requests that will be issued at any given time as a part of * a single parallel transfer. This value applies per api. For example, if two calls to uploadFromFile are made at * the same time, and each specifies a maxConcurrency of 5, there may be up to 10 outstanding, concurrent requests, - * up to 5 for each of the upload operations. For buffered uploads only, this value is the maximum number of buffers - * to be allocated as part of the transfer. In those cases, memory will be allocated lazily as needed and this value - * must be at least two. The amount of memory consumed by methods which buffer may be up to - * blockSize * maxConcurrency. In general, upload methods which do not accept a length parameter must perform some - * buffering. + * up to 5 for each of the upload operations. For buffered uploads only, the maximum number of buffers to be + * allocated as part of the transfer will be {@code maxConcurrency + 1}. In those cases, memory will be allocated + * lazily as needed. The amount of memory consumed by methods which buffer may be up to blockSize * maxConcurrency. + * In general, upload methods which do not accept a length parameter must perform some buffering. * @param progressReceiver {@link ProgressReceiver} * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be @@ -55,7 +54,7 @@ public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, Progre */ this.maxConcurrency = maxConcurrency; if (maxConcurrency != null) { - StorageImplUtils.assertInBounds("numBuffers", maxConcurrency, 1, Integer.MAX_VALUE); + StorageImplUtils.assertInBounds("maxConcurrency", maxConcurrency, 1, Integer.MAX_VALUE); } this.progressReceiver = progressReceiver; this.maxSingleUploadSize = maxSingleUploadSize; diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java index fb999ca8977a..00dc604592bf 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java @@ -6,7 +6,6 @@ import com.azure.storage.common.ParallelTransferOptions; import com.azure.storage.common.implementation.Constants; import com.azure.storage.common.implementation.StorageImplUtils; -import reactor.util.concurrent.Queues; /** * This class provides helper methods for common model patterns. From 8e88d371e955d52f59f063dcc29229b01b74f4f1 Mon Sep 17 00:00:00 2001 From: Rick Ley Date: Tue, 2 Jun 2020 15:00:44 -0700 Subject: [PATCH 6/8] Pr feedback --- .../src/main/java/com/azure/storage/blob/BlobAsyncClient.java | 2 +- .../azure/storage/file/datalake/DataLakeFileAsyncClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java index 4c4d331bafbc..3b73a43fe579 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java @@ -365,7 +365,7 @@ private Mono> uploadInChunks(BlockBlobAsyncClient blockB Lock progressLock = new ReentrantLock(); // Validation done in the constructor. - UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency(), + UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency() + 1, parallelTransferOptions.getBlockSize(), BlockBlobClient.MAX_STAGE_BLOCK_BYTES); Flux chunkedSource = UploadUtils.chunkSource(data, diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java index 2feafef089bb..a8f12bddcd6a 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java @@ -295,7 +295,7 @@ private Mono> uploadInChunks(Flux data, long file Lock progressLock = new ReentrantLock(); // Validation done in the constructor. - UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency(), + UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency() + 1, parallelTransferOptions.getBlockSize(), MAX_APPEND_FILE_BYTES); Flux chunkedSource = UploadUtils.chunkSource(data, parallelTransferOptions); From 310bbfb7e4a20c6445b960919f3632528aae4d0c Mon Sep 17 00:00:00 2001 From: Rick Ley Date: Tue, 2 Jun 2020 16:14:11 -0700 Subject: [PATCH 7/8] CI fixes --- .../com/azure/storage/blob/models/ParallelTransferOptions.java | 1 + .../java/com/azure/storage/common/ParallelTransferOptions.java | 1 + 2 files changed, 2 insertions(+) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java index 6b4e969c4a97..ebf859ed5de3 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java @@ -102,6 +102,7 @@ public Integer getBlockSize() { * @return The number of buffers. * @deprecated Use {@link #getMaxConcurrency()} */ + @Deprecated public Integer getNumBuffers() { return this.maxConcurrency; } diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java index 1be97c210861..d29381b68cab 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java @@ -73,6 +73,7 @@ public Integer getBlockSize() { * @return The number of buffers. * @deprecated Use {@link #getMaxConcurrency()} */ + @Deprecated public Integer getNumBuffers() { return this.maxConcurrency; } From 68a5a192fe483618a1d636e2a49c3e771bd69274 Mon Sep 17 00:00:00 2001 From: Rick Ley Date: Wed, 3 Jun 2020 12:31:37 -0700 Subject: [PATCH 8/8] CI fixes --- .../azure/storage/blob/models/ParallelTransferOptions.java | 4 ++-- .../azure/storage/blob/specialized/BlockBlobAPITest.groovy | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java index ebf859ed5de3..1e14c5c1e2db 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java @@ -30,8 +30,8 @@ public final class ParallelTransferOptions { * that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may * be up to blockSize * numBuffers. * @param maxConcurrency For buffered upload only, the number of buffers is the maximum number of buffers this - * method should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the - * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. + * method should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger + * the number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. * @param progressReceiver {@link ProgressReceiver} */ diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy index 2a9503679c4f..3b6a556ceda9 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy @@ -1270,7 +1270,7 @@ class BlockBlobAPITest extends APISpec { bufferSize | numBuffs 0 | 5 BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES + 1 | 5 - 5 | 1 + 5 | 0 } // Only run these tests in live mode as they use variables that can't be captured.