diff --git a/sdk/storage/azure-storage-blob/CHANGELOG.md b/sdk/storage/azure-storage-blob/CHANGELOG.md index d5e857579acb..4f3c9fdec4ad 100644 --- a/sdk/storage/azure-storage-blob/CHANGELOG.md +++ b/sdk/storage/azure-storage-blob/CHANGELOG.md @@ -1,6 +1,7 @@ # Release History ## 12.7.0-beta.1 (Unreleased) +- Added a maxConcurrency option on ParallelTransferOptions that allows the customer to limit how many concurrent network requests will be outstanding per api request at once. - Added an overload to BlobClient.upload which returns a BlockBlobItem containing the properties returned by the service upon blob creation. - Fixed a bug that caused auth failures when constructing a client to a secondary endpoint using token auth. - Modified client constructors to throw on invalid urls early to prevent SAS tokens from being logged in Exceptions. diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java index cf6ed960aa50..cdfc182e8698 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java @@ -359,14 +359,13 @@ public Mono> uploadWithResponse(Flux data, private Mono> uploadInChunks(BlockBlobAsyncClient blockBlobAsyncClient, Flux data, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers, Map metadata, AccessTier tier, BlobRequestConditions requestConditions) { - // TODO: Parallelism parameter? Or let Reactor handle it? // TODO: Sample/api reference // See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong. AtomicLong totalProgress = new AtomicLong(); Lock progressLock = new ReentrantLock(); // Validation done in the constructor. - UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getNumBuffers(), + UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency() + 1, parallelTransferOptions.getBlockSize(), BlockBlobClient.MAX_STAGE_BLOCK_BYTES); Flux chunkedSource = UploadUtils.chunkSource(data, @@ -391,7 +390,7 @@ private Mono> uploadInChunks(BlockBlobAsyncClient blockB .map(x -> blockId) .doFinally(x -> pool.returnBuffer(buffer)) .flux(); - }) // TODO: parallelism? + }, parallelTransferOptions.getMaxConcurrency()) .collect(Collectors.toList()) .flatMap(ids -> blockBlobAsyncClient.commitBlockListWithResponse(ids, headers, metadata, tier, requestConditions)); @@ -537,7 +536,7 @@ private Mono uploadFileChunks(long fileSize, ParallelTransferOptions paral return client.stageBlockWithResponse(blockId, progressData, chunk.getCount(), null, finalRequestConditions.getLeaseId()); - }) + }, parallelTransferOptions.getMaxConcurrency()) .then(Mono.defer(() -> client.commitBlockListWithResponse( new ArrayList<>(blockIds.values()), headers, metadata, tier, finalRequestConditions))) .then(); diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java index 5d50a388712d..135b0f6f293c 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java @@ -40,8 +40,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO return new ParallelTransferOptions( other.getBlockSize() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE) : other.getBlockSize(), - other.getNumBuffers() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS) - : other.getNumBuffers(), + other.getMaxConcurrency() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS) + : other.getMaxConcurrency(), other.getProgressReceiver(), other.getMaxSingleUploadSize() == null ? Integer.valueOf(BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES) : other.getMaxSingleUploadSize()); @@ -55,7 +55,7 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO public static com.azure.storage.common.ParallelTransferOptions wrapBlobOptions( ParallelTransferOptions blobOptions) { Integer blockSize = blobOptions.getBlockSize(); - Integer numBuffers = blobOptions.getNumBuffers(); + Integer numBuffers = blobOptions.getMaxConcurrency(); com.azure.storage.common.ProgressReceiver wrappedReceiver = blobOptions.getProgressReceiver() == null ? null : blobOptions.getProgressReceiver()::reportProgress; diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java index db293a76cb7a..1e14c5c1e2db 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java @@ -16,7 +16,7 @@ public final class ParallelTransferOptions { private final Integer blockSize; - private final Integer numBuffers; + private final Integer maxConcurrency; private final ProgressReceiver progressReceiver; private final Integer maxSingleUploadSize; @@ -29,14 +29,14 @@ public final class ParallelTransferOptions { * individual call will send more data and will therefore take longer. This parameter also determines the size * that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may * be up to blockSize * numBuffers. - * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method - * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the - * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. + * @param maxConcurrency For buffered upload only, the number of buffers is the maximum number of buffers this + * method should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger + * the number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. * @param progressReceiver {@link ProgressReceiver} */ - public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver) { - this(blockSize, numBuffers, progressReceiver, null); + public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, ProgressReceiver progressReceiver) { + this(blockSize, maxConcurrency, progressReceiver, null); } /** @@ -47,11 +47,14 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe * of requests that need to be made. If block size is large, upload will make fewer network calls, but each * individual call will send more data and will therefore take longer. This parameter also determines the size * that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may - * be up to blockSize * numBuffers. - * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method - * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the - * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. - * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. + * be up to blockSize * maxConcurrency. + * @param maxConcurrency The maximum number of parallel requests that will be issued at any given time as a part of + * a single parallel transfer. This value applies per api. For example, if two calls to uploadFromFile are made at + * the same time, and each specifies a maxConcurrency of 5, there may be up to 10 outstanding, concurrent requests, + * up to 5 for each of the upload operations. For buffered uploads only, the maximum number of buffers to be + * allocated as part of the transfer will be {@code maxConcurrency + 1}. In those cases, memory will be allocated + * lazily as needed. The amount of memory consumed by methods which buffer may be up to blockSize * maxConcurrency. + * In general, upload methods which do not accept a length parameter must perform some buffering. * @param progressReceiver {@link ProgressReceiver} * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be @@ -61,17 +64,22 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe * service accepts for uploading in a single requests and is represented by * {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}. */ - public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, - Integer maxSingleUploadSize) { + public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, ProgressReceiver progressReceiver, + Integer maxSingleUploadSize) { if (blockSize != null) { StorageImplUtils.assertInBounds("blockSize", blockSize, 1, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES); } this.blockSize = blockSize; - if (numBuffers != null) { - StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE); + /* + In buffered upload cases, maxConcurrency must be at least 2 as it also indicates the number of buffers we will + allocate. That minimum is validated in the UploadBufferPool. Because this value is also used in file transfers + which do not buffer, we only check the absolute minimum for this value here. + */ + if (maxConcurrency != null) { + StorageImplUtils.assertInBounds("maxConcurrency", maxConcurrency, 1, Integer.MAX_VALUE); } - this.numBuffers = numBuffers; + this.maxConcurrency = maxConcurrency; this.progressReceiver = progressReceiver; if (maxSingleUploadSize != null) { @@ -92,9 +100,11 @@ public Integer getBlockSize() { /** * Gets the number of buffers being used for a transfer operation. * @return The number of buffers. + * @deprecated Use {@link #getMaxConcurrency()} */ + @Deprecated public Integer getNumBuffers() { - return this.numBuffers; + return this.maxConcurrency; } /** @@ -112,4 +122,12 @@ public ProgressReceiver getProgressReceiver() { public Integer getMaxSingleUploadSize() { return this.maxSingleUploadSize; } + + /** + * Gets the maximum number of parallel requests that will be issued at any given time. + * @return The max concurrency value. + */ + public Integer getMaxConcurrency() { + return this.maxConcurrency; + } } diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java index 4062478e4b78..f77f25d4b7d3 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java @@ -905,7 +905,7 @@ private Mono> downloadToFileImpl(AsynchronousFileChanne .flatMap(response -> writeBodyToFile(response, file, chunkNum, finalParallelTransferOptions, progressLock, totalProgress)); - }) + }, finalParallelTransferOptions.getMaxConcurrency()) // Only the first download call returns a value. .then(Mono.just(buildBlobPropertiesResponse(initialResponse))); }); diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java index 850ec4be8824..a15035981d1c 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobClientBase.java @@ -11,7 +11,6 @@ import com.azure.core.util.Context; import com.azure.core.util.logging.ClientLogger; import com.azure.core.util.polling.SyncPoller; -import com.azure.storage.blob.BlobClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.models.BlobProperties; import com.azure.storage.blob.BlobServiceVersion; @@ -728,8 +727,8 @@ public Response setMetadataWithResponse(Map metadata, Blob *

For more information, see the * Azure Docs

* - * @return A response containing a {@link BlobClient} which is used to interact with the created snapshot, use - * {@link BlobClient#getSnapshotId()} to get the identifier for the snapshot. + * @return A response containing a {@link BlobClientBase} which is used to interact with the created snapshot, use + * {@link BlobClientBase#getSnapshotId()} to get the identifier for the snapshot. */ public BlobClientBase createSnapshot() { return createSnapshotWithResponse(null, null, null, Context.NONE).getValue(); @@ -750,8 +749,8 @@ public BlobClientBase createSnapshot() { * @param requestConditions {@link BlobRequestConditions} * @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised. * @param context Additional context that is passed through the Http pipeline during the service call. - * @return A response containing a {@link BlobClient} which is used to interact with the created snapshot, use - * {@link BlobClient#getSnapshotId()} to get the identifier for the snapshot. + * @return A response containing a {@link BlobClientBase} which is used to interact with the created snapshot, use + * {@link BlobClientBase#getSnapshotId()} to get the identifier for the snapshot. */ public Response createSnapshotWithResponse(Map metadata, BlobRequestConditions requestConditions, Duration timeout, Context context) { diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy index 2a9503679c4f..3b6a556ceda9 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy @@ -1270,7 +1270,7 @@ class BlockBlobAPITest extends APISpec { bufferSize | numBuffs 0 | 5 BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES + 1 | 5 - 5 | 1 + 5 | 0 } // Only run these tests in live mode as they use variables that can't be captured. diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java index e20356d8b1ac..d29381b68cab 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/ParallelTransferOptions.java @@ -14,7 +14,7 @@ public final class ParallelTransferOptions { private final Integer blockSize; - private final Integer numBuffers; + private final Integer maxConcurrency; private final ProgressReceiver progressReceiver; private final Integer maxSingleUploadSize; @@ -28,10 +28,13 @@ public final class ParallelTransferOptions { * For download to file, the block size is the size of each data chunk returned from the service. * For both applications, If block size is large, upload will make fewer network calls, but each * individual call will send more data and will therefore take longer. - * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method - * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the - * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. - * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. + * @param maxConcurrency The maximum number of parallel requests that will be issued at any given time as a part of + * a single parallel transfer. This value applies per api. For example, if two calls to uploadFromFile are made at + * the same time, and each specifies a maxConcurrency of 5, there may be up to 10 outstanding, concurrent requests, + * up to 5 for each of the upload operations. For buffered uploads only, the maximum number of buffers to be + * allocated as part of the transfer will be {@code maxConcurrency + 1}. In those cases, memory will be allocated + * lazily as needed. The amount of memory consumed by methods which buffer may be up to blockSize * maxConcurrency. + * In general, upload methods which do not accept a length parameter must perform some buffering. * @param progressReceiver {@link ProgressReceiver} * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be @@ -40,13 +43,19 @@ public final class ParallelTransferOptions { * any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the * service accepts for uploading in a single requests, which varies depending on the service. */ - public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, + public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, ProgressReceiver progressReceiver, Integer maxSingleUploadSize) { + this.blockSize = blockSize; - if (numBuffers != null) { - StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE); + /* + In buffered upload cases, maxConcurrency must be at least 2 as it also indicates the number of buffers we will + allocate. That minimum is validated in the UploadBufferPool. Because this value is also used in file transfers + which do not buffer, we only check the absolute minimum for this value here. + */ + this.maxConcurrency = maxConcurrency; + if (maxConcurrency != null) { + StorageImplUtils.assertInBounds("maxConcurrency", maxConcurrency, 1, Integer.MAX_VALUE); } - this.numBuffers = numBuffers; this.progressReceiver = progressReceiver; this.maxSingleUploadSize = maxSingleUploadSize; } @@ -62,9 +71,11 @@ public Integer getBlockSize() { /** * Gets the number of buffers being used for a transfer operation. * @return The number of buffers. + * @deprecated Use {@link #getMaxConcurrency()} */ + @Deprecated public Integer getNumBuffers() { - return this.numBuffers; + return this.maxConcurrency; } /** @@ -82,4 +93,12 @@ public ProgressReceiver getProgressReceiver() { public Integer getMaxSingleUploadSize() { return this.maxSingleUploadSize; } + + /** + * Gets the maximum number of parallel requests that will be issued at any given time. + * @return The max concurrency value. + */ + public Integer getMaxConcurrency() { + return this.maxConcurrency; + } } diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java index 02b8ed8ffa13..a8f12bddcd6a 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java @@ -295,7 +295,7 @@ private Mono> uploadInChunks(Flux data, long file Lock progressLock = new ReentrantLock(); // Validation done in the constructor. - UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getNumBuffers(), + UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency() + 1, parallelTransferOptions.getBlockSize(), MAX_APPEND_FILE_BYTES); Flux chunkedSource = UploadUtils.chunkSource(data, parallelTransferOptions); @@ -334,7 +334,7 @@ private Mono> uploadInChunks(Flux data, long file .doFinally(x -> pool.returnBuffer(buffer)) .map(resp -> currentBufferLength + currentOffset) /* End of file after append to pass to flush. */ .flux(); - }) + }, parallelTransferOptions.getMaxConcurrency()) .last() .flatMap(length -> flushWithResponse(length, false, false, httpHeaders, requestConditions)); } @@ -487,7 +487,7 @@ private Mono uploadFileChunks(long fileOffset, long fileSize, ParallelTran return appendWithResponse(progressData, fileOffset + chunk.getOffset(), chunk.getCount(), null, requestConditions.getLeaseId()); - }) + }, parallelTransferOptions.getMaxConcurrency()) .then(Mono.defer(() -> flushWithResponse(fileSize, false, false, headers, requestConditions))) .then(); diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/Transforms.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/Transforms.java index 55d10747c7cf..a36f8ddab562 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/Transforms.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/Transforms.java @@ -440,7 +440,7 @@ static com.azure.storage.blob.models.ParallelTransferOptions toBlobParallelTrans if (pto == null) { return null; } - return new com.azure.storage.blob.models.ParallelTransferOptions(pto.getBlockSize(), pto.getNumBuffers(), + return new com.azure.storage.blob.models.ParallelTransferOptions(pto.getBlockSize(), pto.getMaxConcurrency(), Transforms.toBlobProgressReceiver(pto.getProgressReceiver()), pto.getMaxSingleUploadSize()); } diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java index a25b6fa17ca0..00dc604592bf 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/implementation/util/ModelHelper.java @@ -53,8 +53,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO return new ParallelTransferOptions( other.getBlockSize() == null ? Integer.valueOf(FILE_DEFAULT_UPLOAD_BLOCK_SIZE) : other.getBlockSize(), - other.getNumBuffers() == null ? Integer.valueOf(FILE_DEFAULT_NUMBER_OF_BUFFERS) - : other.getNumBuffers(), + other.getMaxConcurrency() == null ? Integer.valueOf(FILE_DEFAULT_NUMBER_OF_BUFFERS) + : other.getMaxConcurrency(), other.getProgressReceiver(), other.getMaxSingleUploadSize() == null ? Integer.valueOf(MAX_APPEND_FILE_BYTES) : other.getMaxSingleUploadSize());