diff --git a/sdk/storage/azure-storage-blob/CHANGELOG.md b/sdk/storage/azure-storage-blob/CHANGELOG.md index 04b4e22f69a1..85bf9fb81d67 100644 --- a/sdk/storage/azure-storage-blob/CHANGELOG.md +++ b/sdk/storage/azure-storage-blob/CHANGELOG.md @@ -2,7 +2,10 @@ ## 12.2.0-beta.2 (Unreleased) - Added a field to ParallelTransferOptions that allows customers to configure the maximum size to upload in a single PUT. Data sizes larger than this value will be chunked and parallelized. -- Added overloads to downloadToFile to add the option to overwrite existing files. Default behavior is to not overwrite. +- Added overloads to downloadToFile to add the option to overwrite existing files. Default behavior is to not overwrite. +- Improved performance of BlockBlobOutputStream. +- Added overloads to BlockBlobClient.getBlobOutputStream to allow users to provide parallel transfer options, http headers, metadata, access tier, and request conditions. + ## 12.2.0-beta.1 (2019-12-17) - Added SAS generation methods on clients to improve discoverability and convenience of sas. Deprecated setContainerName, setBlobName, setSnapshotId, generateSasQueryParameters methods on BlobServiceSasSignatureValues to direct users to using the methods added on clients. diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobOutputStream.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobOutputStream.java index c5caea22918a..2ed7d6ca5d18 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobOutputStream.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobOutputStream.java @@ -3,24 +3,28 @@ package com.azure.storage.blob.specialized; import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClientBuilder; +import com.azure.storage.blob.models.AccessTier; import com.azure.storage.blob.models.AppendBlobRequestConditions; +import com.azure.storage.blob.models.BlobHttpHeaders; import com.azure.storage.blob.models.BlobRequestConditions; import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.CpkInfo; +import com.azure.storage.blob.models.CustomerProvidedKey; import com.azure.storage.blob.models.PageBlobRequestConditions; import com.azure.storage.blob.models.PageRange; +import com.azure.storage.blob.models.ParallelTransferOptions; import com.azure.storage.common.StorageOutputStream; import com.azure.storage.common.implementation.Constants; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Base64; -import java.util.List; -import java.util.UUID; +import java.util.Map; /** * BlobOutputStream allows for the uploading of data to a blob using a stream-like approach. @@ -37,8 +41,9 @@ static BlobOutputStream appendBlobOutputStream(final AppendBlobAsyncClient clien } static BlobOutputStream blockBlobOutputStream(final BlockBlobAsyncClient client, - final BlobRequestConditions requestConditions) { - return new BlockBlobOutputStream(client, requestConditions); + final ParallelTransferOptions parallelTransferOptions, final BlobHttpHeaders headers, + final Map metadata, final AccessTier tier, final BlobRequestConditions requestConditions) { + return new BlockBlobOutputStream(client, parallelTransferOptions, headers, metadata, tier, requestConditions); } static BlobOutputStream pageBlobOutputStream(final PageBlobAsyncClient client, final PageRange pageRange, @@ -70,6 +75,11 @@ public synchronized void close() throws IOException { } catch (final BlobStorageException e) { throw new IOException(e); } + /* Need this check because for block blob the buffered upload error only manifests itself after commit is + called */ + if (this.lastError != null) { + throw lastError; + } } finally { // if close() is called again, an exception will be thrown this.lastError = new IOException(Constants.STREAM_CLOSED); @@ -134,62 +144,74 @@ void commit() { } private static final class BlockBlobOutputStream extends BlobOutputStream { - private final BlobRequestConditions requestConditions; - private final String blockIdPrefix; - private final List blockList; - private final BlockBlobAsyncClient client; + + private FluxSink sink; + + boolean complete; private BlockBlobOutputStream(final BlockBlobAsyncClient client, - final BlobRequestConditions requestConditions) { + final ParallelTransferOptions parallelTransferOptions, final BlobHttpHeaders headers, + final Map metadata, final AccessTier tier, final BlobRequestConditions requestConditions) { super(BlockBlobClient.MAX_STAGE_BLOCK_BYTES); - this.client = client; - this.requestConditions = (requestConditions == null) ? new BlobRequestConditions() : requestConditions; - this.blockIdPrefix = UUID.randomUUID().toString() + '-'; - this.blockList = new ArrayList<>(); - } - /** - * Generates a new block ID to be used for PutBlock. - * - * @return Base64 encoded block ID - */ - private String getCurrentBlockId() { - String blockIdSuffix = String.format("%06d", this.blockList.size()); - return Base64.getEncoder().encodeToString((this.blockIdPrefix + blockIdSuffix) - .getBytes(StandardCharsets.UTF_8)); - } + BlobAsyncClient blobClient = prepareBuilder(client).buildAsyncClient(); - private Mono writeBlock(Flux blockData, String blockId, long writeLength) { - return client.stageBlockWithResponse(blockId, blockData, writeLength, null, - this.requestConditions.getLeaseId()) - .then() + Flux fbb = Flux.create((FluxSink sink) -> this.sink = sink); + + /* Subscribe by upload takes too long. We need to subscribe so that the sink is actually created. Since + this subscriber doesn't do anything and no data has started flowing, there are no drawbacks to this extra + subscribe. */ + fbb.subscribe(); + + blobClient.uploadWithResponse(fbb, parallelTransferOptions, headers, metadata, tier, requestConditions) + // This allows the operation to continue while maintaining the error that occurred. .onErrorResume(BlobStorageException.class, e -> { this.lastError = new IOException(e); return Mono.empty(); - }); + }) + .doOnTerminate(() -> complete = true) + .subscribe(); } - @Override - protected Mono dispatchWrite(byte[] data, int writeLength, long offset) { - if (writeLength == 0) { - return Mono.empty(); + private BlobClientBuilder prepareBuilder(BlobAsyncClientBase client) { + BlobClientBuilder builder = new BlobClientBuilder() + .pipeline(client.getHttpPipeline()) + .endpoint(client.getBlobUrl()) + .snapshot(client.getSnapshotId()) + .serviceVersion(client.getServiceVersion()); + + CpkInfo cpk = client.getCustomerProvidedKey(); + if (cpk != null) { + builder.customerProvidedKey(new CustomerProvidedKey(cpk.getEncryptionKey())); } - final String blockID = this.getCurrentBlockId(); - this.blockList.add(blockID); + return builder; + } - Flux fbb = Flux.range(0, 1) - .concatMap(pos -> Mono.fromCallable(() -> ByteBuffer.wrap(data, (int) offset, writeLength))); + @Override + void commit() { + sink.complete(); + + // Need to wait until the uploadTask completes + while (!complete) { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + // Does this need to be caught by logger? + e.printStackTrace(); + } + } + } - return this.writeBlock(fbb.subscribeOn(Schedulers.elastic()), blockID, writeLength); + @Override + protected void writeInternal(final byte[] data, int offset, int length) { + sink.next(ByteBuffer.wrap(data, offset, length)); } - /** - * Commits the blob, for block blob this uploads the block list. - */ + // Never called @Override - synchronized void commit() { - client.commitBlockListWithResponse(this.blockList, null, null, null, this.requestConditions).block(); + protected Mono dispatchWrite(byte[] data, int writeLength, long offset) { + return Mono.empty(); } } diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlockBlobClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlockBlobClient.java index 7d8a48e6b865..08be8ec2e766 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlockBlobClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlockBlobClient.java @@ -18,6 +18,7 @@ import com.azure.storage.blob.models.BlockBlobItem; import com.azure.storage.blob.models.BlockList; import com.azure.storage.blob.models.BlockListType; +import com.azure.storage.blob.models.ParallelTransferOptions; import com.azure.storage.common.Utility; import com.azure.storage.common.implementation.Constants; import reactor.core.publisher.Flux; @@ -115,7 +116,29 @@ public BlobOutputStream getBlobOutputStream(boolean overwrite) { * @throws BlobStorageException If a storage service error occurred. */ public BlobOutputStream getBlobOutputStream(BlobRequestConditions requestConditions) { - return BlobOutputStream.blockBlobOutputStream(client, requestConditions); + return getBlobOutputStream(null, null, null, null, requestConditions); + } + + /** + * Creates and opens an output stream to write data to the block blob. If the blob already exists on the service, it + * will be overwritten. + *

+ * To avoid overwriting, pass "*" to {@link BlobRequestConditions#setIfNoneMatch(String)}. + * + * @param parallelTransferOptions {@link ParallelTransferOptions} used to configure buffered uploading. + * @param headers {@link BlobHttpHeaders} + * @param metadata Metadata to associate with the blob. + * @param tier {@link AccessTier} for the destination blob. + * @param requestConditions {@link BlobRequestConditions} + * + * @return A {@link BlobOutputStream} object used to write data to the blob. + * @throws BlobStorageException If a storage service error occurred. + */ + public BlobOutputStream getBlobOutputStream(ParallelTransferOptions parallelTransferOptions, + BlobHttpHeaders headers, Map metadata, AccessTier tier, + BlobRequestConditions requestConditions) { + return BlobOutputStream.blockBlobOutputStream(client, parallelTransferOptions, headers, metadata, tier, + requestConditions); } /** diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobOutputStreamTest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobOutputStreamTest.groovy index d42f857fee31..667013540f7d 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobOutputStreamTest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobOutputStreamTest.groovy @@ -39,7 +39,6 @@ class BlobOutputStreamTest extends APISpec { and: blockBlobClient.getBlobOutputStream() - then: thrown(IllegalArgumentException) } diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/StorageOutputStream.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/StorageOutputStream.java index cdeb0a129e81..78a7c6e0bf8b 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/StorageOutputStream.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/StorageOutputStream.java @@ -42,7 +42,7 @@ protected StorageOutputStream(final int writeThreshold) { * @param offset An int which represents the start offset in the data. * @param length An int which represents the number of bytes to write. */ - private void writeInternal(final byte[] data, int offset, int length) { + protected void writeInternal(final byte[] data, int offset, int length) { int chunks = (int) (Math.ceil((double) length / (double) this.writeThreshold)); Flux.range(0, chunks).map(c -> offset + c * this.writeThreshold) .concatMap(pos -> processChunk(data, pos, offset, length)) @@ -125,7 +125,7 @@ public void write(@NonNull final byte[] data, final int offset, final int length *

* true is acceptable for you. * - * @param byteVal An int which represents the bye value to write. + * @param byteVal An int which represents the byte value to write. */ @Override public void write(final int byteVal) {