diff --git a/sdk/storage/azure-storage-blob/CHANGELOG.md b/sdk/storage/azure-storage-blob/CHANGELOG.md index 32fd9eeafeaf..0f0cfc08654e 100644 --- a/sdk/storage/azure-storage-blob/CHANGELOG.md +++ b/sdk/storage/azure-storage-blob/CHANGELOG.md @@ -1,6 +1,7 @@ # Release History ## 12.2.0-beta.2 (Unreleased) +- Added a field to ParallelTransferOptions that allows customers to configure the maximum size to upload in a single PUT. Data sizes larger than this value will be chunked and parallelized. ## 12.2.0-beta.1 (2019-12-17) - Added SAS generation methods on clients to improve discoverability and convenience of sas. Deprecated setContainerName, setBlobName, setSnapshotId, generateSasQueryParameters methods on BlobServiceSasSignatureValues to direct users to using the methods added on clients. diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java index 0450df7d194d..de7a3dc42655 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java @@ -72,8 +72,6 @@ * Docs for more information. */ public class BlobAsyncClient extends BlobAsyncClientBase { - private static final int CHUNKED_UPLOAD_REQUIREMENT = 4 * Constants.MB; - /** * The block size to use if none is specified in parallel operations. */ @@ -325,7 +323,8 @@ public Mono> uploadWithResponse(Flux data, .addProgressReporting(stream, validatedParallelTransferOptions.getProgressReceiver()), length, headers, metadata, tier, null, validatedRequestConditions); - return determineUploadFullOrChunked(data, uploadInChunksFunction, uploadFullBlobMethod); + return determineUploadFullOrChunked(data, validatedParallelTransferOptions, uploadInChunksFunction, + uploadFullBlobMethod); } catch (RuntimeException ex) { return monoError(logger, ex); } @@ -392,6 +391,7 @@ as we can guarantee we only need at most two buffers for any call to write (two } private Mono> determineUploadFullOrChunked(final Flux data, + ParallelTransferOptions parallelTransferOptions, final Function, Mono>> uploadInChunks, final BiFunction, Long, Mono>> uploadFullBlob) { final long[] bufferedDataSize = {0}; @@ -408,12 +408,12 @@ private Mono> determineUploadFullOrChunked(final Flux { - if (bufferedDataSize[0] > CHUNKED_UPLOAD_REQUIREMENT) { + if (bufferedDataSize[0] > parallelTransferOptions.getMaxSingleUploadSize()) { return false; } else { bufferedDataSize[0] += buffer.remaining(); - if (bufferedDataSize[0] > CHUNKED_UPLOAD_REQUIREMENT) { + if (bufferedDataSize[0] > parallelTransferOptions.getMaxSingleUploadSize()) { return true; } else { /* @@ -428,13 +428,13 @@ private Mono> determineUploadFullOrChunked(final Flux { @@ -500,7 +500,7 @@ public Mono uploadFromFile(String filePath, boolean overwrite) { // Note that if the file will be uploaded using a putBlob, we also can skip the exists check. if (!overwrite) { - if (uploadInBlocks(filePath)) { + if (uploadInBlocks(filePath, BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES)) { overwriteCheck = exists().flatMap(exists -> exists ? monoError(logger, new IllegalArgumentException(Constants.BLOB_ALREADY_EXISTS)) : Mono.empty()); @@ -538,6 +538,8 @@ public Mono uploadFromFile(String filePath, boolean overwrite) { public Mono uploadFromFile(String filePath, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers, Map metadata, AccessTier tier, BlobRequestConditions requestConditions) { + final ParallelTransferOptions finalParallelTransferOptions = + ModelHelper.populateAndApplyDefaults(parallelTransferOptions); try { return Mono.using(() -> uploadFileResourceSupplier(filePath), channel -> { @@ -546,8 +548,8 @@ public Mono uploadFromFile(String filePath, ParallelTransferOptions parall long fileSize = channel.size(); // If the file is larger than 256MB chunk it and stage it as blocks. - if (uploadInBlocks(filePath)) { - return uploadBlocks(fileSize, parallelTransferOptions, headers, metadata, tier, + if (uploadInBlocks(filePath, finalParallelTransferOptions.getMaxSingleUploadSize())) { + return uploadBlocks(fileSize, finalParallelTransferOptions, headers, metadata, tier, requestConditions, channel, blockBlobAsyncClient); } else { // Otherwise we know it can be sent in a single request reducing network overhead. @@ -564,11 +566,11 @@ public Mono uploadFromFile(String filePath, ParallelTransferOptions parall } } - boolean uploadInBlocks(String filePath) { + boolean uploadInBlocks(String filePath, Integer maxSingleUploadSize) { AsynchronousFileChannel channel = uploadFileResourceSupplier(filePath); boolean retVal; try { - retVal = channel.size() > 256 * Constants.MB; + retVal = channel.size() > maxSingleUploadSize; } catch (IOException e) { throw logger.logExceptionAsError(new UncheckedIOException(e)); } finally { @@ -581,17 +583,16 @@ boolean uploadInBlocks(String filePath) { private Mono uploadBlocks(long fileSize, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers, Map metadata, AccessTier tier, BlobRequestConditions requestConditions, AsynchronousFileChannel channel, BlockBlobAsyncClient client) { - final ParallelTransferOptions finalParallelTransferOptions = - ModelHelper.populateAndApplyDefaults(parallelTransferOptions); final BlobRequestConditions finalRequestConditions = (requestConditions == null) ? new BlobRequestConditions() : requestConditions; + // parallelTransferOptions are finalized in the calling method. // See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong. AtomicLong totalProgress = new AtomicLong(); Lock progressLock = new ReentrantLock(); final SortedMap blockIds = new TreeMap<>(); - return Flux.fromIterable(sliceFile(fileSize, finalParallelTransferOptions.getBlockSize(), + return Flux.fromIterable(sliceFile(fileSize, parallelTransferOptions.getBlockSize(), parallelTransferOptions)) .flatMap(chunk -> { String blockId = getBlockID(); @@ -599,7 +600,7 @@ private Mono uploadBlocks(long fileSize, ParallelTransferOptions parallelT Flux progressData = ProgressReporter.addParallelProgressReporting( FluxUtil.readFile(channel, chunk.getOffset(), chunk.getCount()), - finalParallelTransferOptions.getProgressReceiver(), progressLock, totalProgress); + parallelTransferOptions.getProgressReceiver(), progressLock, totalProgress); return client.stageBlockWithResponse(blockId, progressData, chunk.getCount(), null, finalRequestConditions.getLeaseId()); diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobClient.java index 718d9d4cf95b..9180d76f63b4 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobClient.java @@ -145,7 +145,7 @@ public void uploadFromFile(String filePath, boolean overwrite) { if (!overwrite) { // Note we only want to make the exists call if we will be uploading in stages. Otherwise it is superfluous. - if (client.uploadInBlocks(filePath) && exists()) { + if (client.uploadInBlocks(filePath, BlockBlobClient.MAX_UPLOAD_BLOB_BYTES) && exists()) { throw logger.logExceptionAsError(new IllegalArgumentException(Constants.BLOB_ALREADY_EXISTS)); } requestConditions = new BlobRequestConditions().setIfNoneMatch(Constants.HeaderConstants.ETAG_WILDCARD); @@ -157,7 +157,7 @@ public void uploadFromFile(String filePath, boolean overwrite) { * Creates a new block blob, or updates the content of an existing block blob. *

* To avoid overwriting, pass "*" to {@link BlobRequestConditions#setIfNoneMatch(String)}. - * + * *

Code Samples

* * {@codesnippet com.azure.storage.blob.BlobClient.uploadFromFile#String-ParallelTransferOptions-BlobHttpHeaders-Map-AccessTier-BlobRequestConditions-Duration} diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java index f8a980a586f6..79737482d8f7 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/ModelHelper.java @@ -5,6 +5,7 @@ import com.azure.storage.blob.BlobAsyncClient; import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; import java.util.regex.Pattern; @@ -31,6 +32,8 @@ public static ParallelTransferOptions populateAndApplyDefaults(ParallelTransferO : other.getBlockSize(), other.getNumBuffers() == null ? Integer.valueOf(BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS) : other.getNumBuffers(), - other.getProgressReceiver()); + other.getProgressReceiver(), + other.getMaxSingleUploadSize() == null ? Integer.valueOf(BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES) + : other.getMaxSingleUploadSize()); } } diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java index 369976363a28..db293a76cb7a 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/models/ParallelTransferOptions.java @@ -18,6 +18,7 @@ public final class ParallelTransferOptions { private final Integer blockSize; private final Integer numBuffers; private final ProgressReceiver progressReceiver; + private final Integer maxSingleUploadSize; /** * Creates a new {@link ParallelTransferOptions} with default parameters applied. @@ -35,6 +36,33 @@ public final class ParallelTransferOptions { * @param progressReceiver {@link ProgressReceiver} */ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver) { + this(blockSize, numBuffers, progressReceiver, null); + } + + /** + * Creates a new {@link ParallelTransferOptions} with default parameters applied. + * + * @param blockSize The block size. + * For upload, The block size is the size of each block that will be staged. This value also determines the number + * of requests that need to be made. If block size is large, upload will make fewer network calls, but each + * individual call will send more data and will therefore take longer. This parameter also determines the size + * that each buffer uses when buffering is required and consequently amount of memory consumed by such methods may + * be up to blockSize * numBuffers. + * @param numBuffers For buffered upload only, the number of buffers is the maximum number of buffers this method + * should allocate. Memory will be allocated lazily as needed. Must be at least two. Typically, the larger the + * number of buffers, the more parallel, and thus faster, the upload portion of this operation will be. + * The amount of memory consumed by methods using this value may be up to blockSize * numBuffers. + * @param progressReceiver {@link ProgressReceiver} + * @param maxSingleUploadSize If the size of the data is less than or equal to this value, it will be uploaded in a + * single put rather than broken up into chunks. If the data is uploaded in a single shot, the block size will be + * ignored. Some constraints to consider are that more requests cost more, but several small or mid-sized requests + * may sometimes perform better. In the case of buffered upload, up to this amount of data may be buffered before + * any data is sent. Must be greater than 0. May be null to accept default behavior, which is the maximum value the + * service accepts for uploading in a single requests and is represented by + * {@link BlockBlobAsyncClient#MAX_UPLOAD_BLOB_BYTES}. + */ + public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressReceiver progressReceiver, + Integer maxSingleUploadSize) { if (blockSize != null) { StorageImplUtils.assertInBounds("blockSize", blockSize, 1, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES); } @@ -45,6 +73,12 @@ public ParallelTransferOptions(Integer blockSize, Integer numBuffers, ProgressRe } this.numBuffers = numBuffers; this.progressReceiver = progressReceiver; + + if (maxSingleUploadSize != null) { + StorageImplUtils.assertInBounds("maxSingleUploadSize", maxSingleUploadSize, 1, + BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES); + } + this.maxSingleUploadSize = maxSingleUploadSize; } /** @@ -65,9 +99,17 @@ public Integer getNumBuffers() { /** * Gets the Progress receiver for parallel reporting - * @return the progress reporter + * @return The progress reporter */ public ProgressReceiver getProgressReceiver() { return this.progressReceiver; } + + /** + * Gets the value above which the upload will be broken into blocks and parallelized. + * @return The threshold value. + */ + public Integer getMaxSingleUploadSize() { + return this.maxSingleUploadSize; + } } diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/ProgressReporterTest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/ProgressReporterTest.groovy index e999bc602cee..871a32e2cdb3 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/ProgressReporterTest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/ProgressReporterTest.groovy @@ -103,5 +103,5 @@ class ProgressReporterTest extends APISpec { 0 * mockReceiver.reportProgress({ it > 60 }) } - // See TransferManagerTest for network tests of the parallel ProgressReporter. + // TODO (rickle-msft): See TransferManagerTest for network tests of the parallel ProgressReporter. } diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy index 587654085599..5f98a26dcb9d 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy @@ -760,6 +760,60 @@ class BlockBlobAPITest extends APISpec { file.delete() } + @Unroll + @Requires({ liveMode() }) + def "Upload from file reporter"() { + when: + def uploadReporter = new Reporter(blockSize) + def file = getRandomFile(size) + + ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(blockSize, bufferCount, + uploadReporter) + + then: + StepVerifier.create(blobAsyncClient.uploadFromFile(file.toPath().toString(), parallelTransferOptions, + null, null, null, null)) + .assertNext({ + assert uploadReporter.getReportingCount() == (long) (size / blockSize) + }).verifyComplete() + + cleanup: + file.delete() + + where: + size | blockSize | bufferCount + 10 * Constants.MB | 10 * Constants.MB | 8 + 20 * Constants.MB | 1 * Constants.MB | 5 + 10 * Constants.MB | 5 * Constants.MB | 2 + 10 * Constants.MB | 10 * Constants.KB | 100 + } + + @Unroll + @Requires({ liveMode() }) + def "Upload from file options"() { + setup: + def file = getRandomFile(dataSize) + + when: + blobClient.uploadFromFile(file.toPath().toString(), + new ParallelTransferOptions(blockSize, null, null, singleUploadSize), null, null, null, null, null) + + then: + blobClient.getBlockBlobClient() + .listBlocks(BlockListType.COMMITTED).getCommittedBlocks().size() == expectedBlockCount + + + cleanup: + file.delete() + + where: + dataSize | singleUploadSize | blockSize || expectedBlockCount + BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES - 1 | null | null || 0 // Test that the default for singleUploadSize is the maximum + BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1 | null | null || Math.ceil(((double) BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1) / (double) BlobClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE) // "". This also validates the default for blockSize + 100 | 50 | null || 1 // Test that singleUploadSize is respected + 100 | 50 | 20 || 5 // Test that blockSize is respected + } + def "Upload min"() { when: blockBlobClient.upload(defaultInputStream.get(), defaultDataSize, true) @@ -1218,9 +1272,9 @@ class BlockBlobAPITest extends APISpec { then: StepVerifier.create(uploadOperation.then(blockBlobAsyncClient.getPropertiesWithResponse(null))) .assertNext({ - assert validateBlobProperties(it, cacheControl, contentDisposition, contentEncoding, contentLanguage, - contentMD5, contentType == null ? "application/octet-stream" : contentType) - }).verifyComplete() + assert validateBlobProperties(it, cacheControl, contentDisposition, contentEncoding, contentLanguage, + contentMD5, contentType == null ? "application/octet-stream" : contentType) + }).verifyComplete() // HTTP default content type is application/octet-stream. where: @@ -1254,9 +1308,9 @@ class BlockBlobAPITest extends APISpec { then: StepVerifier.create(uploadOperation.then(blobAsyncClient.getPropertiesWithResponse(null))) .assertNext({ - assert it.getStatusCode() == 200 - assert it.getValue().getMetadata() == metadata - }).verifyComplete() + assert it.getStatusCode() == 200 + assert it.getValue().getMetadata() == metadata + }).verifyComplete() where: key1 | value1 | key2 | value2 @@ -1264,6 +1318,28 @@ class BlockBlobAPITest extends APISpec { "foo" | "bar" | "fizz" | "buzz" } + @Unroll + @Requires({ liveMode() }) + def "Buffered upload options"() { + setup: + def data = getRandomData(dataSize) + + when: + blobAsyncClient.uploadWithResponse(Flux.just(data), + new ParallelTransferOptions(blockSize, null, null, singleUploadSize), null, null, null, null).block() + + then: + blobAsyncClient.getBlockBlobAsyncClient() + .listBlocks(BlockListType.COMMITTED).block().getCommittedBlocks().size() == expectedBlockCount + + where: + dataSize | singleUploadSize | blockSize || expectedBlockCount + BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES - 1 | null | null || 0 // Test that the default for singleUploadSize is the maximum + BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1 | null | null || Math.ceil(((double) BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES + 1) / (double) BlobClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE) // "". This also validates the default for blockSize + 100 | 50 | null || 1 // Test that singleUploadSize is respected + 100 | 50 | 20 || 5 // Test that blockSize is respected + } + // Only run these tests in live mode as they use variables that can't be captured. @Unroll @Requires({ liveMode() }) @@ -1427,9 +1503,9 @@ class BlockBlobAPITest extends APISpec { // A second subscription to a download stream will StepVerifier.create(blobAsyncClient.upload(blockBlobAsyncClient.download(), parallelTransferOptions, true)) .verifyErrorSatisfies({ - assert it instanceof BlobStorageException - assert it.getStatusCode() == 500 - }) + assert it instanceof BlobStorageException + assert it.getStatusCode() == 500 + }) } @Requires({ liveMode() })