Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.7.0-beta.1 (Unreleased)
- Added a maxConcurrency option on ParallelTransferOptions that allows the customer to limit how many concurrent network requests will be outstanding per api request at once.
- Added an overload to BlobClient.upload which returns a BlockBlobItem containing the properties returned by the service upon blob creation.
- Fixed a bug that caused auth failures when constructing a client to a secondary endpoint using token auth.
- Modified client constructors to throw on invalid urls early to prevent SAS tokens from being logged in Exceptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,13 @@ public Mono<Response<BlockBlobItem>> uploadWithResponse(Flux<ByteBuffer> data,
private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockBlobAsyncClient,
Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers,
Map<String, String> metadata, AccessTier tier, BlobRequestConditions requestConditions) {
// TODO: Parallelism parameter? Or let Reactor handle it?
// TODO: Sample/api reference
// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong();
Lock progressLock = new ReentrantLock();

// Validation done in the constructor.
UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getNumBuffers(),
UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency() + 1,
parallelTransferOptions.getBlockSize(), BlockBlobClient.MAX_STAGE_BLOCK_BYTES);

Flux<ByteBuffer> chunkedSource = UploadUtils.chunkSource(data,
Expand All @@ -391,7 +390,7 @@ private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockB
.map(x -> blockId)
.doFinally(x -> pool.returnBuffer(buffer))
.flux();
}) // TODO: parallelism?
}, parallelTransferOptions.getMaxConcurrency())
.collect(Collectors.toList())
.flatMap(ids ->
blockBlobAsyncClient.commitBlockListWithResponse(ids, headers, metadata, tier, requestConditions));
Expand Down Expand Up @@ -537,7 +536,7 @@ private Mono<Void> uploadFileChunks(long fileSize, ParallelTransferOptions paral

return client.stageBlockWithResponse(blockId, progressData, chunk.getCount(), null,
finalRequestConditions.getLeaseId());
})
}, parallelTransferOptions.getMaxConcurrency())
.then(Mono.defer(() -> client.commitBlockListWithResponse(
new ArrayList<>(blockIds.values()), headers, metadata, tier, finalRequestConditions)))
.then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
return new ParallelTransferOptions(
other.getBlockSize() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE)
: other.getBlockSize(),
other.getNumBuffers() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS)
: other.getNumBuffers(),
other.getMaxConcurrency() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS)
: other.getMaxConcurrency(),
other.getProgressReceiver(),
other.getMaxSingleUploadSize() == null ? Integer.valueOf(BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES)
: other.getMaxSingleUploadSize());
Expand All @@ -55,7 +55,7 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO
public static com.azure.storage.common.ParallelTransferOptions wrapBlobOptions(
ParallelTransferOptions blobOptions) {
Integer blockSize = blobOptions.getBlockSize();
Integer numBuffers = blobOptions.getNumBuffers();
Integer numBuffers = blobOptions.getMaxConcurrency();
com.azure.storage.common.ProgressReceiver wrappedReceiver = blobOptions.getProgressReceiver() == null
? null
: blobOptions.getProgressReceiver()::reportProgress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
public final class ParallelTransferOptions {

private final Integer blockSize;
private final Integer numBuffers;
private final Integer maxConcurrency;
private final ProgressReceiver progressReceiver;
private final Integer maxSingleUploadSize;

Expand All @@ -29,14 +29,14 @@ public final class ParallelTransferOptions {
* individual call will send more data and will therefore take longer. This parameter also determines the size
* that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may
* be up to blockSize * numBuffers.
* @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method
* should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the
* number of buffers, the more parallel, and thus faster, the upload portion of this operation will be.
* @param maxConcurrency For buffered upload only, the number of buffers is the maximum number of buffers this
* method should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger
* the number of buffers, the more parallel, and thus faster, the upload portion of this operation will be.
* The amount of memory consumed by methods using this value may be up to blockSize * numBuffers.
* @param progressReceiver {@link ProgressReceiver}
*/
public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver) {
this(blockSize, numBuffers, progressReceiver, null);
public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, ProgressReceiver progressReceiver) {
this(blockSize, maxConcurrency, progressReceiver, null);
}

/**
Expand All @@ -47,11 +47,14 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe
* of requests that need to be made. If block size is large, upload will make fewer network calls, but each
* individual call will send more data and will therefore take longer. This parameter also determines the size
* that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may
* be up to blockSize * numBuffers.
* @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method
* should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the
* number of buffers, the more parallel, and thus faster, the upload portion of this operation will be.
* The amount of memory consumed by methods using this value may be up to blockSize * numBuffers.
* be up to blockSize * maxConcurrency.
* @param maxConcurrency The maximum number of parallel requests that will be issued at any given time as a part of
* a single parallel transfer. This value applies per api. For example, if two calls to uploadFromFile are made at
* the same time, and each specifies a maxConcurrency of 5, there may be up to 10 outstanding, concurrent requests,
* up to 5 for each of the upload operations. For buffered uploads only, the maximum number of buffers to be
* allocated as part of the transfer will be {@code maxConcurrency + 1}. In those cases, memory will be allocated
* lazily as needed. The amount of memory consumed by methods which buffer may be up to blockSize * maxConcurrency.
* In general, upload methods which do not accept a length parameter must perform some buffering.
* @param progressReceiver {@link ProgressReceiver}
* @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a
* single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be
Expand All @@ -61,17 +64,22 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe
* service accepts for uploading in a single requests and is represented by
* {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}.
*/
public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver,
Integer maxSingleUploadSize) {
public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, ProgressReceiver progressReceiver,
Integer maxSingleUploadSize) {
if (blockSize != null) {
StorageImplUtils.assertInBounds("blockSize", blockSize, 1, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES);
}
this.blockSize = blockSize;

if (numBuffers != null) {
StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE);
/*
In buffered upload cases, maxConcurrency must be at least 2 as it also indicates the number of buffers we will
allocate. That minimum is validated in the UploadBufferPool. Because this value is also used in file transfers
which do not buffer, we only check the absolute minimum for this value here.
Comment on lines +75 to +77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can solve this problem by allocating maxConcurrency+1 buffers in the UploadbufferPool.

*/
if (maxConcurrency != null) {
StorageImplUtils.assertInBounds("maxConcurrency", maxConcurrency, 1, Integer.MAX_VALUE);
}
this.numBuffers = numBuffers;
this.maxConcurrency = maxConcurrency;
this.progressReceiver = progressReceiver;

if (maxSingleUploadSize != null) {
Expand All @@ -92,9 +100,11 @@ public Integer getBlockSize() {
/**
* Gets the number of buffers being used for a transfer operation.
* @return The number of buffers.
* @deprecated Use {@link #getMaxConcurrency()}
*/
@Deprecated
public Integer getNumBuffers() {
return this.numBuffers;
return this.maxConcurrency;
}

/**
Expand All @@ -112,4 +122,12 @@ public ProgressReceiver getProgressReceiver() {
public Integer getMaxSingleUploadSize() {
return this.maxSingleUploadSize;
}

/**
* Gets the maximum number of parallel requests that will be issued at any given time.
* @return The max concurrency value.
*/
public Integer getMaxConcurrency() {
return this.maxConcurrency;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ private Mono<Response<BlobProperties>> downloadToFileImpl(AsynchronousFileChanne
.flatMap(response ->
writeBodyToFile(response, file, chunkNum, finalParallelTransferOptions, progressLock,
totalProgress));
})
}, finalParallelTransferOptions.getMaxConcurrency())
// Only the first download call returns a value.
.then(Mono.just(buildBlobPropertiesResponse(initialResponse)));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.polling.SyncPoller;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.BlobServiceVersion;
Expand Down Expand Up @@ -728,8 +727,8 @@ public Response<Void> setMetadataWithResponse(Map<String, String> metadata, Blob
* <p>For more information, see the
* <a href="https://docs.microsoft.com/en-us/rest/api/storageservices/snapshot-blob">Azure Docs</a></p>
*
* @return A response containing a {@link BlobClient} which is used to interact with the created snapshot, use
* {@link BlobClient#getSnapshotId()} to get the identifier for the snapshot.
* @return A response containing a {@link BlobClientBase} which is used to interact with the created snapshot, use
* {@link BlobClientBase#getSnapshotId()} to get the identifier for the snapshot.
*/
public BlobClientBase createSnapshot() {
return createSnapshotWithResponse(null, null, null, Context.NONE).getValue();
Expand All @@ -750,8 +749,8 @@ public BlobClientBase createSnapshot() {
* @param requestConditions {@link BlobRequestConditions}
* @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A response containing a {@link BlobClient} which is used to interact with the created snapshot, use
* {@link BlobClient#getSnapshotId()} to get the identifier for the snapshot.
* @return A response containing a {@link BlobClientBase} which is used to interact with the created snapshot, use
* {@link BlobClientBase#getSnapshotId()} to get the identifier for the snapshot.
*/
public Response<BlobClientBase> createSnapshotWithResponse(Map<String, String> metadata,
BlobRequestConditions requestConditions, Duration timeout, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ class BlockBlobAPITest extends APISpec {
bufferSize | numBuffs
0 | 5
BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES + 1 | 5
5 | 1
5 | 0
}

// Only run these tests in live mode as they use variables that can't be captured.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public final class ParallelTransferOptions {

private final Integer blockSize;
private final Integer numBuffers;
private final Integer maxConcurrency;
private final ProgressReceiver progressReceiver;
private final Integer maxSingleUploadSize;

Expand All @@ -28,10 +28,13 @@ public final class ParallelTransferOptions {
* For download to file, the block size is the size of each data chunk returned from the service.
* For both applications, If block size is large, upload will make fewer network calls, but each
* individual call will send more data and will therefore take longer.
* @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method
* should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the
* number of buffers, the more parallel, and thus faster, the upload portion of this operation will be.
* The amount of memory consumed by methods using this value may be up to blockSize * numBuffers.
* @param maxConcurrency The maximum number of parallel requests that will be issued at any given time as a part of
* a single parallel transfer. This value applies per api. For example, if two calls to uploadFromFile are made at
* the same time, and each specifies a maxConcurrency of 5, there may be up to 10 outstanding, concurrent requests,
* up to 5 for each of the upload operations. For buffered uploads only, the maximum number of buffers to be
* allocated as part of the transfer will be {@code maxConcurrency + 1}. In those cases, memory will be allocated
* lazily as needed. The amount of memory consumed by methods which buffer may be up to blockSize * maxConcurrency.
* In general, upload methods which do not accept a length parameter must perform some buffering.
* @param progressReceiver {@link ProgressReceiver}
* @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a
* single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be
Expand All @@ -40,13 +43,19 @@ public final class ParallelTransferOptions {
* any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the
* service accepts for uploading in a single requests, which varies depending on the service.
*/
public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver,
public ParallelTransferOptions(Integer blockSize, Integer maxConcurrency, ProgressReceiver progressReceiver,
Integer maxSingleUploadSize) {

this.blockSize = blockSize;
if (numBuffers != null) {
StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE);
/*
In buffered upload cases, maxConcurrency must be at least 2 as it also indicates the number of buffers we will
allocate. That minimum is validated in the UploadBufferPool. Because this value is also used in file transfers
which do not buffer, we only check the absolute minimum for this value here.
*/
this.maxConcurrency = maxConcurrency;
if (maxConcurrency != null) {
StorageImplUtils.assertInBounds("maxConcurrency", maxConcurrency, 1, Integer.MAX_VALUE);
}
this.numBuffers = numBuffers;
this.progressReceiver = progressReceiver;
this.maxSingleUploadSize = maxSingleUploadSize;
}
Expand All @@ -62,9 +71,11 @@ public Integer getBlockSize() {
/**
* Gets the number of buffers being used for a transfer operation.
* @return The number of buffers.
* @deprecated Use {@link #getMaxConcurrency()}
*/
@Deprecated
public Integer getNumBuffers() {
return this.numBuffers;
return this.maxConcurrency;
}

/**
Expand All @@ -82,4 +93,12 @@ public ProgressReceiver getProgressReceiver() {
public Integer getMaxSingleUploadSize() {
return this.maxSingleUploadSize;
}

/**
* Gets the maximum number of parallel requests that will be issued at any given time.
* @return The max concurrency value.
*/
public Integer getMaxConcurrency() {
return this.maxConcurrency;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private Mono<Response<PathInfo>> uploadInChunks(Flux<ByteBuffer> data, long file
Lock progressLock = new ReentrantLock();

// Validation done in the constructor.
UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getNumBuffers(),
UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency() + 1,
parallelTransferOptions.getBlockSize(), MAX_APPEND_FILE_BYTES);

Flux<ByteBuffer> chunkedSource = UploadUtils.chunkSource(data, parallelTransferOptions);
Expand Down Expand Up @@ -334,7 +334,7 @@ private Mono<Response<PathInfo>> uploadInChunks(Flux<ByteBuffer> data, long file
.doFinally(x -> pool.returnBuffer(buffer))
.map(resp -> currentBufferLength + currentOffset) /* End of file after append to pass to flush. */
.flux();
})
}, parallelTransferOptions.getMaxConcurrency())
.last()
.flatMap(length -> flushWithResponse(length, false, false, httpHeaders, requestConditions));
}
Expand Down Expand Up @@ -487,7 +487,7 @@ private Mono<Void> uploadFileChunks(long fileOffset, long fileSize, ParallelTran

return appendWithResponse(progressData, fileOffset + chunk.getOffset(), chunk.getCount(), null,
requestConditions.getLeaseId());
})
}, parallelTransferOptions.getMaxConcurrency())
.then(Mono.defer(() ->
flushWithResponse(fileSize, false, false, headers, requestConditions)))
.then();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ static com.azure.storage.blob.models.ParallelTransferOptions toBlobParallelTrans
if (pto == null) {
return null;
}
return new com.azure.storage.blob.models.ParallelTransferOptions(pto.getBlockSize(), pto.getNumBuffers(),
return new com.azure.storage.blob.models.ParallelTransferOptions(pto.getBlockSize(), pto.getMaxConcurrency(),
Transforms.toBlobProgressReceiver(pto.getProgressReceiver()), pto.getMaxSingleUploadSize());
}

Expand Down
Loading