diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml index fdf1ecab7e2c..7974e0546cd5 100755 --- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml +++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml @@ -456,6 +456,7 @@ + @@ -623,4 +624,11 @@ RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE, UPM_UNCALLED_PRIVATE_METHOD"/> + + + + + + + diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java index b0033ae94ac5..015798e972c6 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobInputStream.java @@ -4,21 +4,20 @@ import com.azure.core.implementation.util.FluxUtil; import com.azure.core.util.logging.ClientLogger; -import com.azure.storage.blob.BlobProperties; +import com.azure.storage.blob.BlobAsyncClient; import com.azure.storage.blob.models.BlobAccessConditions; import com.azure.storage.blob.models.BlobRange; import com.azure.storage.blob.models.StorageException; import com.azure.storage.common.Constants; -import com.azure.storage.common.SR; +import com.azure.storage.common.StorageInputStream; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; /** * Provides an input stream to read a given blob resource. */ -public final class BlobInputStream extends InputStream { +public final class BlobInputStream extends StorageInputStream { private final ClientLogger logger = new ClientLogger(BlobInputStream.class); /** @@ -26,76 +25,20 @@ public final class BlobInputStream extends InputStream { */ private final BlobAsyncClientBase blobClient; - /** - * A flag to determine if the stream is faulted, if so the last error will be thrown on next operation. - */ - private volatile boolean streamFaulted; - - /** - * Holds the last exception this stream encountered. - */ - private IOException lastError; - - /** - * Holds the stream length. - */ - private final long streamLength; - - /** - * Holds the stream read size for both block and page blobs. - */ - private final int readSize; - - /** - * Holds the reference to the current buffered data. - */ - private ByteBuffer currentBuffer; - - /** - * Holds an absolute byte position for the mark feature. - */ - private long markedPosition; - - /** - * Holds the mark delta for which the mark position is expired. - */ - private int markExpiry; - - /** - * Holds an absolute byte position of the current read position. - */ - private long currentAbsoluteReadPosition; - - /** - * Holds the absolute byte position of the start of the current buffer. - */ - private long bufferStartOffset; - - /** - * Holds the length of the current buffer in bytes. - */ - private int bufferSize; - /** * Holds the {@link BlobAccessConditions} object that represents the access conditions for the blob. */ private final BlobAccessConditions accessCondition; - /** - * Offset of the source blob this class is configured to stream from. - */ - private final long blobRangeOffset; - /** * Initializes a new instance of the BlobInputStream class. * - * @param blobClient A {@link BlobAsyncClientBase} object which represents the blob that this stream is associated - * with. + * @param blobClient A {@link BlobAsyncClient} object which represents the blob that this stream is associated with. * @param accessCondition An {@link BlobAccessConditions} object which represents the access conditions for the * blob. * @throws StorageException An exception representing any error which occurred during the operation. */ - BlobInputStream(final BlobAsyncClientBase blobClient, final BlobAccessConditions accessCondition) + BlobInputStream(final BlobAsyncClient blobClient, final BlobAccessConditions accessCondition) throws StorageException { this(blobClient, 0, null, accessCondition); } @@ -113,61 +56,15 @@ public final class BlobInputStream extends InputStream { * @throws StorageException An exception representing any error which occurred during the operation. */ BlobInputStream(final BlobAsyncClientBase blobClient, long blobRangeOffset, Long blobRangeLength, - final BlobAccessConditions accessCondition) + final BlobAccessConditions accessCondition) throws StorageException { + super(blobRangeOffset, blobRangeLength, 4 * Constants.MB, + blobClient.getProperties().block().getBlobSize()); - this.blobRangeOffset = blobRangeOffset; this.blobClient = blobClient; - this.streamFaulted = false; - this.currentAbsoluteReadPosition = blobRangeOffset; - this.readSize = 4 * Constants.MB; this.accessCondition = accessCondition; - if (blobRangeOffset < 0 || (blobRangeLength != null && blobRangeLength <= 0)) { - throw new IndexOutOfBoundsException(); - } - - BlobProperties properties = blobClient.getProperties().block(); - this.streamLength = blobRangeLength == null - ? properties.getBlobSize() - this.blobRangeOffset - : Math.min(properties.getBlobSize() - this.blobRangeOffset, blobRangeLength); - - this.reposition(blobRangeOffset); - } - - /** - * Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without - * blocking by the next invocation of a method for this input stream. The next invocation might be the same thread - * or another thread. A single read or skip of this many bytes will not block, but may read or skip fewer bytes. - * - * @return An int which represents an estimate of the number of bytes that can be read (or skipped - * over) from this input stream without blocking, or 0 when it reaches the end of the input stream. - */ - @Override - public synchronized int available() { - return this.bufferSize - (int) (this.currentAbsoluteReadPosition - this.bufferStartOffset); - } - - /** - * Helper function to check if the stream is faulted, if it is it surfaces the exception. - * - * @throws IOException If an I/O error occurs. In particular, an IOException may be thrown if the output stream has - * been closed. - */ - private synchronized void checkStreamState() throws IOException { - if (this.streamFaulted) { - throw this.lastError; - } - } - /** - * Closes this input stream and releases any system resources associated with the stream. - */ - @Override - public synchronized void close() { - this.currentBuffer = null; - this.streamFaulted = true; - this.lastError = new IOException(SR.STREAM_CLOSED); } /** @@ -177,15 +74,17 @@ public synchronized void close() { * @param readLength An int which represents the number of bytes to read. * @throws IOException If an I/O error occurs. */ - private synchronized void dispatchRead(final int readLength) throws IOException { + @Override + protected synchronized ByteBuffer dispatchRead(final int readLength, final long offset) throws IOException { try { - this.currentBuffer = this.blobClient.downloadWithResponse(new BlobRange(this.currentAbsoluteReadPosition, + ByteBuffer currentBuffer = this.blobClient.downloadWithResponse(new BlobRange(offset, (long) readLength), null, this.accessCondition, false) .flatMap(response -> FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(ByteBuffer::wrap)) .block(); this.bufferSize = readLength; - this.bufferStartOffset = this.currentAbsoluteReadPosition; + this.bufferStartOffset = offset; + return currentBuffer; } catch (final StorageException e) { this.streamFaulted = true; this.lastError = new IOException(e); @@ -193,228 +92,4 @@ private synchronized void dispatchRead(final int readLength) throws IOException } } - /** - * Marks the current position in this input stream. A subsequent call to the reset method repositions this stream at - * the last marked position so that subsequent reads re-read the same bytes. - * - * @param readlimit An int which represents the maximum limit of bytes that can be read before the mark - * position becomes invalid. - */ - @Override - public synchronized void mark(final int readlimit) { - this.markedPosition = this.currentAbsoluteReadPosition; - this.markExpiry = readlimit; - } - - /** - * Tests if this input stream supports the mark and reset methods. Whether or not mark and reset are supported is an - * invariant property of a particular input stream instance. The markSupported method of {@link InputStream} returns - * false. - * - * @return True if this stream instance supports the mark and reset methods; False - * otherwise. - */ - @Override - public boolean markSupported() { - return true; - } - - /** - * Reads the next byte of data from the input stream. The value byte is returned as an int in the range 0 to 255. If - * no byte is available because the end of the stream has been reached, the value -1 is returned. This method blocks - * until input data is available, the end of the stream is detected, or an exception is thrown. - * - * @return An int which represents the total number of bytes read into the buffer, or -1 if there is no - * more data because the end of the stream has been reached. - * @throws IOException If an I/O error occurs. - */ - @Override - public int read() throws IOException { - final byte[] tBuff = new byte[1]; - final int numberOfBytesRead = this.read(tBuff, 0, 1); - - if (numberOfBytesRead > 0) { - return tBuff[0] & 0xFF; - } else if (numberOfBytesRead == 0) { - throw new IOException(SR.UNEXPECTED_STREAM_READ_ERROR); - } else { - return -1; - } - } - - /** - * Reads some number of bytes from the input stream and stores them into the buffer array b. The number - * of bytes actually read is returned as an integer. This method blocks until input data is available, end of file - * is detected, or an exception is thrown. If the length of b is zero, then no bytes are read and 0 is - * returned; otherwise, there is an attempt to read at least one byte. If no byte is available because the stream is - * at the end of the file, the value -1 is returned; otherwise, at least one byte is read and stored into - * b. - * - * The first byte read is stored into element b[0], the next one into b[1], and so on. The - * number of bytes read is, at most, equal to the length of b. Let k be the number of - * bytes actually read; these bytes will be stored in elements b[0] through b[k-1], - * leaving elements b[k] through - * b[b.length-1] unaffected. - * - * The read(b) method for class {@link InputStream} has the same effect as: - * - * read(b, 0, b.length) - * - * @param b A byte array which represents the buffer into which the data is read. - * @throws IOException If the first byte cannot be read for any reason other than the end of the file, if the input - * stream has been closed, or if some other I/O error occurs. - * @throws NullPointerException If the byte array b is null. - */ - @Override - public int read(final byte[] b) throws IOException { - return this.read(b, 0, b.length); - } - - /** - * Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to - * read as many as len bytes, but a smaller number may be read. The number of bytes actually read is - * returned as an integer. This method blocks until input data is available, end of file is detected, or an - * exception is thrown. - * - * If len is zero, then no bytes are read and 0 is returned; otherwise, there is an attempt to read at - * least one byte. If no byte is available because the stream is at end of file, the value -1 is returned; - * otherwise, at least one byte is read and stored into b. - * - * The first byte read is stored into element b[off], the next one into b[off+1], and so - * on. The number of bytes read is, at most, equal to len. Let k be the number of bytes - * actually read; these bytes will be stored in elements b[off] through b[off+k-1], - * leaving elements b[off+k] through - * b[off+len-1] unaffected. - * - * In every case, elements b[0] through b[off] and elements b[off+len] - * through b[b.length-1] are unaffected. - * - * The read(b, off, len) method for class {@link InputStream} simply calls the method - * read() repeatedly. If the first such - * call results in an IOException, that exception is returned from the call to the - * read(b, off, len) method. If any - * subsequent call to read() results in a IOException, the exception is caught and treated - * as if it were end of file; the bytes read up to that point are stored into b and the number of bytes - * read before the exception occurred is returned. The default implementation of this method blocks until the - * requested amount of input data - * len has been read, end of file is detected, or an exception is thrown. Subclasses are encouraged to - * provide a more efficient implementation of this method. - * - * @param b A byte array which represents the buffer into which the data is read. - * @param off An int which represents the start offset in the byte array at which the data - * is written. - * @param len An int which represents the maximum number of bytes to read. - * @return An int which represents the total number of bytes read into the buffer, or -1 if there is no - * more data because the end of the stream has been reached. - * @throws IOException If the first byte cannot be read for any reason other than end of file, or if the input - * stream has been closed, or if some other I/O error occurs. - * @throws NullPointerException If the byte array b is null. - * @throws IndexOutOfBoundsException If off is negative, len is negative, or - * len is greater than - * b.length - off. - */ - @Override - public int read(final byte[] b, final int off, final int len) throws IOException { - if (off < 0 || len < 0 || len > b.length - off) { - throw logger.logExceptionAsError(new IndexOutOfBoundsException()); - } - - return this.readInternal(b, off, len); - } - - /** - * Performs internal read to the given byte buffer. - * - * @param b A byte array which represents the buffer into which the data is read. - * @param off An int which represents the start offset in the byte array b at - * which the data is written. - * @param len An int which represents the maximum number of bytes to read. - * @return An int which represents the total number of bytes read into the buffer, or -1 if there is no - * more data because the end of the stream has been reached. - * @throws IOException If the first byte cannot be read for any reason other than end of file, or if the input - * stream has been closed, or if some other I/O error occurs. - */ - private synchronized int readInternal(final byte[] b, final int off, int len) throws IOException { - this.checkStreamState(); - - // if buffer is empty do next get operation - if ((this.currentBuffer == null || this.currentBuffer.remaining() == 0) - && this.currentAbsoluteReadPosition < this.streamLength + this.blobRangeOffset) { - this.dispatchRead((int) Math.min(this.readSize, - this.streamLength + this.blobRangeOffset - this.currentAbsoluteReadPosition)); - } - - len = Math.min(len, this.readSize); - - final int numberOfBytesRead; - if (currentBuffer == null || currentBuffer.remaining() == 0) { - numberOfBytesRead = -1; - } else { - numberOfBytesRead = Math.min(len, this.currentBuffer.remaining()); - // do read from buffer - this.currentBuffer = this.currentBuffer.get(b, off, numberOfBytesRead); - } - - if (numberOfBytesRead > 0) { - this.currentAbsoluteReadPosition += numberOfBytesRead; - } - - // update markers - if (this.markExpiry > 0 && this.markedPosition + this.markExpiry < this.currentAbsoluteReadPosition) { - this.markedPosition = this.blobRangeOffset; - this.markExpiry = 0; - } - - return numberOfBytesRead; - } - - /** - * Repositions the stream to the given absolute byte offset. - * - * @param absolutePosition A long which represents the absolute byte offset withitn the stream - * reposition. - */ - private synchronized void reposition(final long absolutePosition) { - this.currentAbsoluteReadPosition = absolutePosition; - this.currentBuffer = ByteBuffer.allocate(0); - this.bufferStartOffset = absolutePosition; - } - - /** - * Repositions this stream to the position at the time the mark method was last called on this input stream. Note - * repositioning the blob read stream will disable blob MD5 checking. - * - * @throws IOException If this stream has not been marked or if the mark has been invalidated. - */ - @Override - public synchronized void reset() throws IOException { - if (this.markedPosition + this.markExpiry < this.currentAbsoluteReadPosition) { - throw new IOException(SR.MARK_EXPIRED); - } - this.reposition(this.markedPosition); - } - - /** - * Skips over and discards n bytes of data from this input stream. The skip method may, for a variety of reasons, - * end up skipping over some smaller number of bytes, possibly 0. This may result from any of a number of - * conditions; reaching end of file before n bytes have been skipped is only one possibility. The actual number of - * bytes skipped is returned. If n is negative, no bytes are skipped. - * - * Note repositioning the blob read stream will disable blob MD5 checking. - * - * @param n A long which represents the number of bytes to skip. - */ - @Override - public synchronized long skip(final long n) { - if (n == 0) { - return 0; - } - - if (n < 0 || this.currentAbsoluteReadPosition + n > this.streamLength + this.blobRangeOffset) { - throw logger.logExceptionAsError(new IndexOutOfBoundsException()); - } - - this.reposition(this.currentAbsoluteReadPosition + n); - return n; - } } diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobOutputStream.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobOutputStream.java index acf6d1e6d2a8..0128c8164d3e 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobOutputStream.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobOutputStream.java @@ -2,7 +2,7 @@ // Licensed under the MIT License. package com.azure.storage.blob.specialized; -import com.azure.storage.blob.BlobAsyncClient; +import com.azure.core.util.logging.ClientLogger; import com.azure.storage.blob.models.AppendBlobAccessConditions; import com.azure.storage.blob.models.AppendPositionAccessConditions; import com.azure.storage.blob.models.BlobAccessConditions; @@ -11,13 +11,12 @@ import com.azure.storage.blob.models.PageRange; import com.azure.storage.blob.models.StorageException; import com.azure.storage.common.SR; +import com.azure.storage.common.StorageOutputStream; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import reactor.util.annotation.NonNull; import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -25,139 +24,29 @@ import java.util.List; import java.util.UUID; -public abstract class BlobOutputStream extends OutputStream { - /* - * Holds the write threshold of number of bytes to buffer prior to dispatching a write. For block blob this is the - * block size, for page blob this is the Page commit size. - */ - int writeThreshold; +public abstract class BlobOutputStream extends StorageOutputStream { - /* - * Holds the last exception this stream encountered. - */ - volatile IOException lastError; + BlobOutputStream(final int writeThreshold) { + super(writeThreshold); + } static BlobOutputStream appendBlobOutputStream(final AppendBlobAsyncClient client, - final AppendBlobAccessConditions appendBlobAccessConditions) { + final AppendBlobAccessConditions appendBlobAccessConditions) { return new AppendBlobOutputStream(client, appendBlobAccessConditions); } static BlobOutputStream blockBlobOutputStream(final BlockBlobAsyncClient client, - final BlobAccessConditions accessConditions) { + final BlobAccessConditions accessConditions) { return new BlockBlobOutputStream(client, accessConditions); } - static BlobOutputStream pageBlobOutputStream(final PageBlobAsyncClient client, final long length, - final BlobAccessConditions accessConditions) { - return new PageBlobOutputStream(client, length, accessConditions); + static BlobOutputStream pageBlobOutputStream(final PageBlobAsyncClient client, final PageRange pageRange, + final BlobAccessConditions accessConditions) { + return new PageBlobOutputStream(client, pageRange, accessConditions); } - abstract Mono dispatchWrite(byte[] data, int writeLength, long offset); - abstract void commit(); - /** - * Writes the data to the buffer and triggers writes to the service as needed. - * - * @param data A byte array which represents the data to write. - * @param offset An int which represents the start offset in the data. - * @param length An int which represents the number of bytes to write. - * @throws IOException If an I/O error occurs. In particular, an IOException may be thrown if the output stream has - * been closed. - */ - private void writeInternal(final byte[] data, int offset, int length) { - int chunks = (int) (Math.ceil((double) length / (double) this.writeThreshold)); - Flux.range(0, chunks).map(c -> offset + c * this.writeThreshold) - .concatMap(pos -> processChunk(data, pos, offset, length)) - .then() - .block(); - } - - private Mono processChunk(byte[] data, int position, int offset, int length) { - int chunkLength = this.writeThreshold; - - if (position + chunkLength > offset + length) { - chunkLength = offset + length - position; - } - - // Flux chunkData = new ByteBufferStreamFromByteArray(data, writeThreshold, position, chunkLength); - return dispatchWrite(data, chunkLength, position - offset) - .doOnError(t -> { - if (t instanceof IOException) { - lastError = (IOException) t; - } else { - lastError = new IOException(t); - } - }); - } - - /** - * Helper function to check if the stream is faulted, if it is it surfaces the exception. - * - * @throws IOException If an I/O error occurs. In particular, an IOException may be thrown if the output stream has - * been closed. - */ - private void checkStreamState() throws IOException { - if (this.lastError != null) { - throw this.lastError; - } - } - - /** - * Flushes this output stream and forces any buffered output bytes to be written out. If any data remains in the - * buffer it is committed to the service. - * - * @throws IOException If an I/O error occurs. - */ - @Override - public void flush() throws IOException { - this.checkStreamState(); - } - - /** - * Writes b.length bytes from the specified byte array to this output stream. - *

- * - * @param data A byte array which represents the data to write. - */ - @Override - public void write(@NonNull final byte[] data) { - this.write(data, 0, data.length); - } - - /** - * Writes length bytes from the specified byte array starting at offset to this output stream. - *

- * - * @param data A byte array which represents the data to write. - * @param offset An int which represents the start offset in the data. - * @param length An int which represents the number of bytes to write. - * @throws IndexOutOfBoundsException If {@code offset} or {@code length} are less than {@code 0} or {@code offset} - * plus {@code length} is greater than the {@code data} length. - */ - @Override - public void write(@NonNull final byte[] data, final int offset, final int length) { - if (offset < 0 || length < 0 || length > data.length - offset) { - throw new IndexOutOfBoundsException(); - } - - this.writeInternal(data, offset, length); - } - - /** - * Writes the specified byte to this output stream. The general contract for write is that one byte is written to - * the output stream. The byte to be written is the eight low-order bits of the argument b. The 24 high-order bits - * of b are ignored. - *

- * true is acceptable for you. - * - * @param byteVal An int which represents the bye value to write. - */ - @Override - public void write(final int byteVal) { - this.write(new byte[]{(byte) (byteVal & 0xFF)}); - } - /** * Closes this output stream and releases any system resources associated with this stream. If any data remains in * the buffer it is committed to the service. @@ -193,9 +82,9 @@ private static final class AppendBlobOutputStream extends BlobOutputStream { private final AppendBlobAsyncClient client; private AppendBlobOutputStream(final AppendBlobAsyncClient client, - final AppendBlobAccessConditions appendBlobAccessConditions) { + final AppendBlobAccessConditions appendBlobAccessConditions) { + super(AppendBlobClient.MAX_APPEND_BLOCK_BYTES); this.client = client; - this.writeThreshold = BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE; this.appendBlobAccessConditions = appendBlobAccessConditions; if (appendBlobAccessConditions != null) { @@ -226,7 +115,7 @@ private Mono appendBlock(Flux blockData, long offset, long wri } @Override - Mono dispatchWrite(byte[] data, int writeLength, long offset) { + protected Mono dispatchWrite(byte[] data, int writeLength, long offset) { if (writeLength == 0) { return Mono.empty(); } @@ -244,7 +133,7 @@ Mono dispatchWrite(byte[] data, int writeLength, long offset) { Flux fbb = Flux.range(0, 1) .concatMap(pos -> Mono.fromCallable(() -> ByteBuffer.wrap(data, (int) offset, writeLength))); - return this.appendBlock(fbb.subscribeOn(Schedulers.elastic()), offset, writeLength); + return this.appendBlock(fbb.subscribeOn(Schedulers.elastic()), this.initialBlobOffset, writeLength); } @Override @@ -260,11 +149,11 @@ private static final class BlockBlobOutputStream extends BlobOutputStream { private final BlockBlobAsyncClient client; private BlockBlobOutputStream(final BlockBlobAsyncClient client, final BlobAccessConditions accessConditions) { + super(BlockBlobClient.MAX_STAGE_BLOCK_BYTES); this.client = client; this.accessConditions = accessConditions; this.blockIdPrefix = UUID.randomUUID().toString() + '-'; this.blockList = new ArrayList<>(); - this.writeThreshold = BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE; } /** @@ -291,7 +180,7 @@ private Mono writeBlock(Flux blockData, String blockId, long w } @Override - Mono dispatchWrite(byte[] data, int writeLength, long offset) { + protected Mono dispatchWrite(byte[] data, int writeLength, long offset) { if (writeLength == 0) { return Mono.empty(); } @@ -315,13 +204,16 @@ synchronized void commit() { } private static final class PageBlobOutputStream extends BlobOutputStream { + private final ClientLogger logger = new ClientLogger(PageBlobOutputStream.class); private final PageBlobAsyncClient client; private final PageBlobAccessConditions pageBlobAccessConditions; + private final PageRange pageRange; - private PageBlobOutputStream(final PageBlobAsyncClient client, final long length, - final BlobAccessConditions blobAccessConditions) { + private PageBlobOutputStream(final PageBlobAsyncClient client, final PageRange pageRange, + final BlobAccessConditions blobAccessConditions) { + super(PageBlobClient.MAX_PUT_PAGES_BYTES); this.client = client; - this.writeThreshold = (int) Math.min(BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE, length); + this.pageRange = pageRange; if (blobAccessConditions != null) { this.pageBlobAccessConditions = new PageBlobAccessConditions() @@ -332,8 +224,8 @@ private PageBlobOutputStream(final PageBlobAsyncClient client, final long length } } - private Mono writePages(Flux pageData, long offset, long writeLength) { - return client.uploadPagesWithResponse(new PageRange().setStart(offset).setEnd(offset + writeLength - 1), + private Mono writePages(Flux pageData, int length, long offset) { + return client.uploadPagesWithResponse(new PageRange().setStart(offset).setEnd(offset + length - 1), pageData, pageBlobAccessConditions) .then() .onErrorResume(t -> t instanceof StorageException, e -> { @@ -343,7 +235,7 @@ private Mono writePages(Flux pageData, long offset, long write } @Override - Mono dispatchWrite(byte[] data, int writeLength, long offset) { + protected Mono dispatchWrite(byte[] data, int writeLength, long offset) { if (writeLength == 0) { return Mono.empty(); } @@ -356,7 +248,13 @@ Mono dispatchWrite(byte[] data, int writeLength, long offset) { Flux fbb = Flux.range(0, 1) .concatMap(pos -> Mono.fromCallable(() -> ByteBuffer.wrap(data, (int) offset, writeLength))); - return this.writePages(fbb.subscribeOn(Schedulers.elastic()), offset, writeLength); + long pageOffset = pageRange.getStart(); + if (pageOffset + writeLength - 1 > pageRange.getEnd()) { + throw logger.logExceptionAsError( + new RuntimeException("The input data length is larger than the page range.")); + } + pageRange.setStart(pageRange.getStart() + writeLength); + return this.writePages(fbb.subscribeOn(Schedulers.elastic()), writeLength, pageOffset); } @Override diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/PageBlobClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/PageBlobClient.java index d66238be568a..d017156fc765 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/PageBlobClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/PageBlobClient.java @@ -69,28 +69,30 @@ public final class PageBlobClient extends BlobClientBase { * Creates and opens an output stream to write data to the page blob. If the blob already exists on the service, it * will be overwritten. * - * @param length A long which represents the length, in bytes, of the stream to create. This value must - * be a multiple of 512. + * @param pageRange A {@link PageRange} object. Given that pages must be aligned with 512-byte boundaries, the start + * offset must be a modulus of 512 and the end offset must be a modulus of 512 - 1. Examples of valid byte ranges + * are 0-511, 512-1023, etc. * @return A {@link BlobOutputStream} object used to write data to the blob. * @throws StorageException If a storage service error occurred. */ - public BlobOutputStream getBlobOutputStream(long length) { - return getBlobOutputStream(length, null); + public BlobOutputStream getBlobOutputStream(PageRange pageRange) { + return getBlobOutputStream(pageRange, null); } /** * Creates and opens an output stream to write data to the page blob. If the blob already exists on the service, it * will be overwritten. * - * @param length A long which represents the length, in bytes, of the stream to create. This value must - * be a multiple of 512. + * @param pageRange A {@link PageRange} object. Given that pages must be aligned with 512-byte boundaries, the start + * offset must be a modulus of 512 and the end offset must be a modulus of 512 - 1. Examples of valid byte ranges + * are 0-511, 512-1023, etc. * @param accessConditions A {@link BlobAccessConditions} object that represents the access conditions for the * blob. * @return A {@link BlobOutputStream} object used to write data to the blob. * @throws StorageException If a storage service error occurred. */ - public BlobOutputStream getBlobOutputStream(long length, BlobAccessConditions accessConditions) { - return BlobOutputStream.pageBlobOutputStream(pageBlobAsyncClient, length, accessConditions); + public BlobOutputStream getBlobOutputStream(PageRange pageRange, BlobAccessConditions accessConditions) { + return BlobOutputStream.pageBlobOutputStream(pageBlobAsyncClient, pageRange, accessConditions); } /** diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobOutputStreamTest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobOutputStreamTest.groovy index 5e64352115db..6ca80bf94c09 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobOutputStreamTest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobOutputStreamTest.groovy @@ -1,8 +1,7 @@ package com.azure.storage.blob - +import com.azure.storage.blob.models.PageRange import com.azure.storage.common.Constants -import spock.lang.Ignore import spock.lang.Requires class BlobOutputStreamTest extends APISpec { @@ -33,7 +32,7 @@ class BlobOutputStreamTest extends APISpec { when: - def outputStream = pageBlobClient.getBlobOutputStream(data.length) + def outputStream = pageBlobClient.getBlobOutputStream(new PageRange().setStart(0).setEnd(16 * Constants.MB - 1)) outputStream.write(data) outputStream.close() @@ -42,7 +41,7 @@ class BlobOutputStreamTest extends APISpec { } // Test is failing, need to investigate. - @Ignore + @Requires({ liveMode() }) def "AppendBlob output stream"() { setup: def data = getRandomByteArray(4 * FOUR_MB) @@ -52,7 +51,7 @@ class BlobOutputStreamTest extends APISpec { when: def outputStream = appendBlobClient.getBlobOutputStream() for (int i = 0; i != 4; i++) { - outputStream.write(Arrays.copyOfRange(data, i * FOUR_MB, ((i + 1) * FOUR_MB) - 1)) + outputStream.write(Arrays.copyOfRange(data, i * FOUR_MB, ((i + 1) * FOUR_MB))) } outputStream.close() diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy index fd9d6a78c964..ded4d9b8b9f2 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlockBlobInputOutputStreamTest.groovy @@ -1,6 +1,6 @@ package com.azure.storage.blob - +import com.azure.storage.blob.specialized.BlobOutputStream import com.azure.storage.blob.specialized.BlockBlobClient import com.azure.storage.common.Constants import spock.lang.Requires @@ -16,11 +16,11 @@ class BlockBlobInputOutputStreamTest extends APISpec { @Requires({ liveMode() }) def "Upload download"() { when: - def length = 30 * Constants.MB - def randomBytes = getRandomByteArray(length) + int length = 6 * Constants.MB + byte[] randomBytes = getRandomByteArray(length) - def outStream = bc.getBlobOutputStream() - outStream.write(randomBytes) + BlobOutputStream outStream = bc.getBlobOutputStream() + outStream.write(randomBytes, 1 * Constants.MB, 5 * Constants.MB) outStream.close() then: @@ -34,6 +34,7 @@ class BlockBlobInputOutputStreamTest extends APISpec { } catch (IOException ex) { throw new UncheckedIOException(ex) } - assert outputStream.toByteArray() == randomBytes + byte[] randomBytes2 = outputStream.toByteArray() + assert randomBytes2 == Arrays.copyOfRange(randomBytes, 1 * Constants.MB, 6 * Constants.MB) } } diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy index 202fedb60c96..f506628933aa 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy @@ -13,6 +13,7 @@ import com.azure.core.http.policy.HttpPipelinePolicy import com.azure.core.util.Context import com.azure.storage.blob.APISpec import com.azure.storage.blob.BlobAsyncClient +import com.azure.storage.blob.BlobClient import com.azure.storage.blob.BlobServiceClientBuilder import com.azure.storage.blob.models.AccessTier import com.azure.storage.blob.models.BlobAccessConditions @@ -42,11 +43,13 @@ class BlockBlobAPITest extends APISpec { BlockBlobClient bc BlockBlobAsyncClient bac BlobAsyncClient blobac + BlobClient blobClient String blobName def setup() { blobName = generateBlobName() - bc = cc.getBlobClient(blobName).getBlockBlobClient() + blobClient = cc.getBlobClient(blobName) + bc = blobClient.getBlockBlobClient() bc.upload(defaultInputStream.get(), defaultDataSize) blobac = ccAsync.getBlobAsyncClient(generateBlobName()) bac = blobac.getBlockBlobAsyncClient() @@ -602,7 +605,7 @@ class BlockBlobAPITest extends APISpec { def outStream = new ByteArrayOutputStream() when: - bc.uploadFromFile(file.getAbsolutePath()) + blobClient.uploadFromFile(file.getAbsolutePath()) then: bc.download(outStream) @@ -617,7 +620,7 @@ class BlockBlobAPITest extends APISpec { def outStream = new ByteArrayOutputStream() when: - bc.uploadFromFile(file.getAbsolutePath(), null, null, metadata, null, null, null) + blobClient.uploadFromFile(file.getAbsolutePath(), null, null, metadata, null, null, null) then: metadata == bc.getProperties().getMetadata() @@ -804,7 +807,7 @@ class BlockBlobAPITest extends APISpec { def data = getRandomData(dataSize) ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions() .setBlockSize(bufferSize).setNumBuffers(numBuffs) - bac.upload(Flux.just(data), parallelTransferOptions).block() + blobac.upload(Flux.just(data), parallelTransferOptions).block() data.position(0) then: @@ -852,7 +855,7 @@ class BlockBlobAPITest extends APISpec { .setBlockSize(bufferSize).setNumBuffers(numBuffers) def dataList = [] as List dataSizeList.each { size -> dataList.add(getRandomData(size)) } - bac.upload(Flux.fromIterable(dataList), parallelTransferOptions).block() + blobac.upload(Flux.fromIterable(dataList), parallelTransferOptions).block() expect: compareListToBuffer(dataList, collectBytesInBuffer(bac.download()).block()) @@ -926,7 +929,7 @@ class BlockBlobAPITest extends APISpec { @Requires({ liveMode() }) def "Buffered upload metadata"() { setup: - def metadata = [] as Map + def metadata = [:] as Map if (key1 != null) { metadata.put(key1, value1) } diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/StorageInputStream.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/StorageInputStream.java new file mode 100644 index 000000000000..1e6bd147fe60 --- /dev/null +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/StorageInputStream.java @@ -0,0 +1,391 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.common; + +import com.azure.core.util.logging.ClientLogger; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * Provides an input stream to read a given storage resource. + */ +public abstract class StorageInputStream extends InputStream { + private final ClientLogger logger = new ClientLogger(StorageInputStream.class); + + /** + * A flag to determine if the stream is faulted, if so the last error will be thrown on next operation. + */ + protected volatile boolean streamFaulted; + + /** + * Holds the last exception this stream encountered. + */ + protected IOException lastError; + + + /** + * Holds the reference to the current buffered data. + */ + private ByteBuffer currentBuffer; + + /** + * Holds an absolute byte position for the mark feature. + */ + private long markedPosition; + + /** + * Holds the mark delta for which the mark position is expired. + */ + private int markExpiry; + + /** + * Holds an absolute byte position of the current read position. + */ + private long currentAbsoluteReadPosition; + + /** + * Holds the absolute byte position of the start of the current buffer. + */ + protected long bufferStartOffset; + + /** + * Holds the length of the current buffer in bytes. + */ + protected int bufferSize; + + /** + * Offset of the source blob this class is configured to stream from. + */ + private final long rangeOffset; + + /** + * Holds the stream read size. + */ + private final int chunkSize; + + /** + * Holds the stream length. + */ + private final long streamLength; + + /** + * Initializes a new instance of the StorageInputStream class. + * + * @param chunkSize the size of chunk allowed to pass for storage service request. + * @param contentLength the actual content length for input data. + */ + protected StorageInputStream(final int chunkSize, final long contentLength) { + this(0, null, chunkSize, contentLength); + } + + /** + * Initializes a new instance of the StorageInputStream class. + * + * @param rangeOffset The offset of the data to begin stream. + * @param rangeLength How much data the stream should return after blobRangeOffset. + * @param chunkSize Holds the stream read size. + * @param contentLength The length of the stream to be transferred. + * @throws IndexOutOfBoundsException when range offset is less than 0 or rangeLength exists but les than or + * equal to 0. + */ + protected StorageInputStream(long rangeOffset, final Long rangeLength, + final int chunkSize, final long contentLength) { + this.rangeOffset = rangeOffset; + this.streamFaulted = false; + this.currentAbsoluteReadPosition = rangeOffset; + this.chunkSize = chunkSize; + this.streamLength = rangeLength == null ? contentLength - this.rangeOffset + : Math.min(contentLength - this.rangeOffset, rangeLength); + if (rangeOffset < 0 || (rangeLength != null && rangeLength <= 0)) { + throw logger.logExceptionAsError(new IndexOutOfBoundsException()); + } + + this.reposition(rangeOffset); + } + + /** + * Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without + * blocking by the next invocation of a method for this input stream. The next invocation might be the same thread + * or another thread. A single read or skip of this many bytes will not block, but may read or skip fewer bytes. + * + * @return An int which represents an estimate of the number of bytes that can be read (or skipped + * over) from this input stream without blocking, or 0 when it reaches the end of the input stream. + */ + @Override + public synchronized int available() { + return this.bufferSize - (int) (this.currentAbsoluteReadPosition - this.bufferStartOffset); + } + + /** + * Helper function to check if the stream is faulted, if it is it surfaces the exception. + * + * @throws RuntimeException If an I/O error occurs. In particular, an IOException may be thrown if the output stream + * has been closed. + */ + private synchronized void checkStreamState() { + if (this.streamFaulted) { + throw logger.logExceptionAsError(new RuntimeException(this.lastError.getMessage())); + } + } + + /** + * Closes this input stream and releases any system resources associated with the stream. + * + * @throws IOException If an I/O error occurs. + */ + @Override + public synchronized void close() { + this.currentBuffer = null; + this.streamFaulted = true; + this.lastError = new IOException(SR.STREAM_CLOSED); + } + + /** + * Dispatches a read operation of N bytes. + * + * @param readLength An int which represents the number of bytes to read. + * @param offset The start point of data to be acquired. + * @return The bytebuffer which store one chunk size of data. + * @throws IOException If an I/O error occurs. + */ + protected abstract ByteBuffer dispatchRead(int readLength, long offset) throws IOException; + + /** + * Marks the current position in this input stream. A subsequent call to the reset method repositions this stream at + * the last marked position so that subsequent reads re-read the same bytes. + * + * @param readlimit An int which represents the maximum limit of bytes that can be read before the mark + * position becomes invalid. + */ + @Override + public synchronized void mark(final int readlimit) { + this.markedPosition = this.currentAbsoluteReadPosition; + this.markExpiry = readlimit; + } + + /** + * Tests if this input stream supports the mark and reset methods. Whether or not mark and reset are supported is an + * invariant property of a particular input stream instance. The markSupported method of {@link InputStream} returns + * false. + * + * @return True if this stream instance supports the mark and reset methods; False + * otherwise. + */ + @Override + public boolean markSupported() { + return true; + } + + /** + * Reads the next byte of data from the input stream. The value byte is returned as an int in the range 0 to 255. If + * no byte is available because the end of the stream has been reached, the value -1 is returned. This method blocks + * until input data is available, the end of the stream is detected, or an exception is thrown. + * + * @return An int which represents the total number of bytes read into the buffer, or -1 if there is no + * more data because the end of the stream has been reached. + * @throws RuntimeException when no available bytes to read. + * @throws IOException If an I/O error occurs. + */ + @Override + public int read() throws IOException { + final byte[] tBuff = new byte[1]; + final int numberOfBytesRead = this.read(tBuff, 0, 1); + + if (numberOfBytesRead > 0) { + return tBuff[0] & 0xFF; + } else if (numberOfBytesRead == 0) { + throw logger.logExceptionAsError(new RuntimeException(SR.UNEXPECTED_STREAM_READ_ERROR)); + } else { + return -1; + } + } + + /** + * Reads some number of bytes from the input stream and stores them into the buffer array b. The number + * of bytes actually read is returned as an integer. This method blocks until input data is available, end of file + * is detected, or an exception is thrown. If the length of b is zero, then no bytes are read and 0 is + * returned; otherwise, there is an attempt to read at least one byte. If no byte is available because the stream is + * at the end of the file, the value -1 is returned; otherwise, at least one byte is read and stored into + * b. + * + * The first byte read is stored into element b[0], the next one into b[1], and so on. The + * number of bytes read is, at most, equal to the length of b. Let k be the number of + * bytes actually read; these bytes will be stored in elements b[0] through b[k-1], + * leaving elements b[k] through + * b[b.length-1] unaffected. + * + * The read(b) method for class {@link InputStream} has the same effect as: + * + * read(b, 0, b.length) + * + * @param b A byte array which represents the buffer into which the data is read. + * @throws IOException If the first byte cannot be read for any reason other than the end of the file, if the input + * stream has been closed, or if some other I/O error occurs. + * @throws NullPointerException If the byte array b is null. + */ + @Override + public int read(final byte[] b) throws IOException { + return this.read(b, 0, b.length); + } + + /** + * Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to + * read as many as len bytes, but a smaller number may be read. The number of bytes actually read is + * returned as an integer. This method blocks until input data is available, end of file is detected, or an + * exception is thrown. + * + * If len is zero, then no bytes are read and 0 is returned; otherwise, there is an attempt to read at + * least one byte. If no byte is available because the stream is at end of file, the value -1 is returned; + * otherwise, at least one byte is read and stored into b. + * + * The first byte read is stored into element b[off], the next one into b[off+1], and so + * on. The number of bytes read is, at most, equal to len. Let k be the number of bytes + * actually read; these bytes will be stored in elements b[off] through b[off+k-1], + * leaving elements b[off+k] through + * b[off+len-1] unaffected. + * + * In every case, elements b[0] through b[off] and elements b[off+len] + * through b[b.length-1] are unaffected. + * + * The read(b, off, len) method for class {@link InputStream} simply calls the method + * read() repeatedly. If the first such + * call results in an IOException, that exception is returned from the call to the + * read(b, off, len) method. If any + * subsequent call to read() results in a IOException, the exception is caught and treated + * as if it were end of file; the bytes read up to that point are stored into b and the number of bytes + * read before the exception occurred is returned. The default implementation of this method blocks until the + * requested amount of input data + * len has been read, end of file is detected, or an exception is thrown. Subclasses are encouraged to + * provide a more efficient implementation of this method. + * + * @param b A byte array which represents the buffer into which the data is read. + * @param off An int which represents the start offset in the byte array at which the data + * is written. + * @param len An int which represents the maximum number of bytes to read. + * @return An int which represents the total number of bytes read into the buffer, or -1 if there is no + * more data because the end of the stream has been reached. + * @throws IOException If the first byte cannot be read for any reason other than end of file, or if the input + * stream has been closed, or if some other I/O error occurs. + * @throws NullPointerException If the byte array b is null. + * @throws IndexOutOfBoundsException If off is negative, len is negative, or + * len is greater than + * b.length - off. + */ + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + if (off < 0 || len < 0 || len > b.length - off) { + throw logger.logExceptionAsError(new IndexOutOfBoundsException()); + } + + int chunks = (int) (Math.ceil((double) len / (double) this.chunkSize)); + int numOfBytesRead = 0; + for (int i = 0; i < chunks; i++) { + int results = this.readInternal(b, off + numOfBytesRead, len); + if (results == -1) { + return -1; + } + numOfBytesRead += results; + } + return numOfBytesRead; + } + + /** + * Performs internal read to the given byte buffer. + * + * @param b A byte array which represents the buffer into which the data is read. + * @param off An int which represents the start offset in the byte array b at + * which the data is written. + * @param len An int which represents the maximum number of bytes to read. + * @return An int which represents the total number of bytes read into the buffer, or -1 if there is no + * more data because the end of the stream has been reached. + * @throws IOException If the first byte cannot be read for any reason other than end of file, or if the input + * stream has been closed, or if some other I/O error occurs. + */ + private synchronized int readInternal(final byte[] b, final int off, int len) throws IOException { + this.checkStreamState(); + + // if buffer is empty do next get operation + if ((this.currentBuffer == null || this.currentBuffer.remaining() == 0) + && this.currentAbsoluteReadPosition < this.streamLength + this.rangeOffset) { + this.currentBuffer = this.dispatchRead((int) Math.min(this.chunkSize, + this.streamLength + this.rangeOffset - this.currentAbsoluteReadPosition), + this.currentAbsoluteReadPosition); + } + + len = Math.min(len, this.chunkSize); + + final int numberOfBytesRead; + if (currentBuffer.remaining() == 0) { + numberOfBytesRead = -1; + } else { + numberOfBytesRead = Math.min(len, this.currentBuffer.remaining()); + // do read from buffer + this.currentBuffer = this.currentBuffer.get(b, off, numberOfBytesRead); + } + + if (numberOfBytesRead > 0) { + this.currentAbsoluteReadPosition += numberOfBytesRead; + } + + // update markers + if (this.markExpiry > 0 && this.markedPosition + this.markExpiry < this.currentAbsoluteReadPosition) { + this.markedPosition = this.rangeOffset; + this.markExpiry = 0; + } + + return numberOfBytesRead; + } + + /** + * Repositions the stream to the given absolute byte offset. + * + * @param absolutePosition A long which represents the absolute byte offset withitn the stream + * reposition. + */ + private synchronized void reposition(final long absolutePosition) { + this.currentAbsoluteReadPosition = absolutePosition; + this.currentBuffer = ByteBuffer.allocate(0); + this.bufferStartOffset = absolutePosition; + } + + /** + * Repositions this stream to the position at the time the mark method was last called on this input stream. Note + * repositioning the blob read stream will disable blob MD5 checking. + * + * @throws RuntimeException If this stream has not been marked or if the mark has been invalidated. + */ + @Override + public synchronized void reset() { + if (this.markedPosition + this.markExpiry < this.currentAbsoluteReadPosition) { + throw logger.logExceptionAsError(new RuntimeException(SR.MARK_EXPIRED)); + } + this.reposition(this.markedPosition); + } + + /** + * Skips over and discards n bytes of data from this input stream. The skip method may, for a variety of reasons, + * end up skipping over some smaller number of bytes, possibly 0. This may result from any of a number of + * conditions; reaching end of file before n bytes have been skipped is only one possibility. The actual number of + * bytes skipped is returned. If n is negative, no bytes are skipped. + * + * Note repositioning the blob read stream will disable blob MD5 checking. + * + * @param n A long which represents the number of bytes to skip. + */ + @Override + public synchronized long skip(final long n) { + if (n == 0) { + return 0; + } + + if (n < 0 || this.currentAbsoluteReadPosition + n > this.streamLength + this.rangeOffset) { + throw logger.logExceptionAsError(new IndexOutOfBoundsException()); + } + + this.reposition(this.currentAbsoluteReadPosition + n); + return n; + } +} diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/StorageOutputStream.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/StorageOutputStream.java new file mode 100644 index 000000000000..8ca6af592473 --- /dev/null +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/StorageOutputStream.java @@ -0,0 +1,155 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.common; + +import com.azure.core.util.logging.ClientLogger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.annotation.NonNull; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * StorageOutputStream + */ +public abstract class StorageOutputStream extends OutputStream { + final ClientLogger logger = new ClientLogger(StorageOutputStream.class); + + /* + * Holds the write threshold of number of bytes to buffer prior to dispatching a write. For block blob this is the + * block size, for page blob this is the Page commit size. + */ + private final int writeThreshold; + + /* + * Holds the last exception this stream encountered. + */ + protected volatile IOException lastError; + + protected abstract Mono dispatchWrite(byte[] data, int writeLength, long offset); + + protected StorageOutputStream(final int writeThreshold) { + this.writeThreshold = writeThreshold; + } + + /** + * Writes the data to the buffer and triggers writes to the service as needed. + * + * @param data A byte array which represents the data to write. + * @param offset An int which represents the start offset in the data. + * @param length An int which represents the number of bytes to write. + */ + private void writeInternal(final byte[] data, int offset, int length) { + int chunks = (int) (Math.ceil((double) length / (double) this.writeThreshold)); + Flux.range(0, chunks).map(c -> offset + c * this.writeThreshold) + .concatMap(pos -> processChunk(data, pos, offset, length)) + .then() + .block(); + } + + private Mono processChunk(byte[] data, int position, int offset, int length) { + int chunkLength = this.writeThreshold; + + if (position + chunkLength > offset + length) { + chunkLength = offset + length - position; + } + + // Flux chunkData = new ByteBufferStreamFromByteArray(data, writeThreshold, position, chunkLength); + return dispatchWrite(data, chunkLength, position) + .doOnError(t -> { + if (t instanceof IOException) { + lastError = (IOException) t; + } else { + lastError = new IOException(t); + } + }); + } + + /** + * Helper function to check if the stream is faulted, if it is it surfaces the exception. + * + * @throws RuntimeException If an I/O error occurs. In particular, an IOException may be thrown + * if the output stream has been closed. + */ + protected void checkStreamState() { + if (this.lastError != null) { + throw logger.logExceptionAsError(new RuntimeException(this.lastError.getMessage())); + } + } + + /** + * Flushes this output stream and forces any buffered output bytes to be written out. If any data remains in the + * buffer it is committed to the service. + */ + @Override + public void flush() { + this.checkStreamState(); + } + + /** + * Writes b.length bytes from the specified byte array to this output stream. + *

+ * + * @param data A byte array which represents the data to write. + */ + @Override + public void write(@NonNull final byte[] data) { + this.write(data, 0, data.length); + } + + /** + * Writes length bytes from the specified byte array starting at offset to this output stream. + *

+ * + * @param data A byte array which represents the data to write. + * @param offset An int which represents the start offset in the data. + * @param length An int which represents the number of bytes to write. + * @throws IndexOutOfBoundsException when access the bytes out of the bound. + */ + @Override + public void write(@NonNull final byte[] data, final int offset, final int length) { + if (offset < 0 || length < 0 || length > data.length - offset) { + throw logger.logExceptionAsError(new IndexOutOfBoundsException()); + } + + this.writeInternal(data, offset, length); + } + + /** + * Writes the specified byte to this output stream. The general contract for write is that one byte is written to + * the output stream. The byte to be written is the eight low-order bits of the argument b. The 24 high-order bits + * of b are ignored. + *

+ * true is acceptable for you. + * + * @param byteVal An int which represents the bye value to write. + */ + @Override + public void write(final int byteVal) { + this.write(new byte[]{(byte) (byteVal & 0xFF)}); + } + + /** + * Closes this output stream and releases any system resources associated with this stream. If any data remains in + * the buffer it is committed to the service. + * + * @throws IOException If an I/O error occurs. + */ + @Override + public synchronized void close() throws IOException { + try { + // if the user has already closed the stream, this will throw a STREAM_CLOSED exception + // if an exception was thrown by any thread in the threadExecutor, realize it now + this.checkStreamState(); + + // flush any remaining data + this.flush(); + } finally { + // if close() is called again, an exception will be thrown + this.lastError = new IOException(SR.STREAM_CLOSED); + } + } + +} diff --git a/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/FileClient.java b/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/FileClient.java index 082857cf1893..65facb96362c 100644 --- a/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/FileClient.java +++ b/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/FileClient.java @@ -48,7 +48,7 @@ public class FileClient { private final FileAsyncClient fileAsyncClient; /** - * Creates a FileClient that wraps a FileAsyncClient and blocks requests. + * Creates a FileClient that wraps a FileAsyncClient and requests. * * @param fileAsyncClient FileAsyncClient that is used to send requests */ @@ -65,6 +65,53 @@ public String getFileUrl() { return fileAsyncClient.getFileUrl(); } + /** + * Opens a file input stream to download the file. + *

+ * + * @return An InputStream object that represents the stream to use for reading from the file. + * @throws StorageException If a storage service error occurred. + */ + public final StorageFileInputStream openInputStream() { + return openInputStream(new FileRange(0)); + } + + /** + * Opens a file input stream to download the specified range of the file. + *

+ * + * @param range {@link FileRange} + * @return An InputStream object that represents the stream to use for reading from the file. + * @throws StorageException If a storage service error occurred. + */ + public final StorageFileInputStream openInputStream(FileRange range) { + return new StorageFileInputStream(fileAsyncClient, range.getStart(), range.getEnd()); + } + + /** + * Creates and opens an output stream to write data to the file. If the file already exists on the service, it + * will be overwritten. + * + * @return A {@link StorageFileOutputStream} object used to write data to the file. + * @throws StorageException If a storage service error occurred. + */ + public final StorageFileOutputStream getFileOutputStream() { + return getFileOutputStream(0); + } + + /** + * Creates and opens an output stream to write data to the file. If the file already exists on the service, it + * will be overwritten. + * + * @param offset Optional starting point of the upload range. It will start from the beginning if it is + * {@code null} + * @return A {@link StorageFileOutputStream} object used to write data to the file. + * @throws StorageException If a storage service error occurred. + */ + public final StorageFileOutputStream getFileOutputStream(long offset) { + return new StorageFileOutputStream(fileAsyncClient, offset); + } + /** * Creates a file in the storage account and returns a response of {@link FileInfo} to interact with it. * diff --git a/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/StorageFileInputStream.java b/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/StorageFileInputStream.java new file mode 100644 index 000000000000..8e7138119692 --- /dev/null +++ b/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/StorageFileInputStream.java @@ -0,0 +1,74 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.file; + +import com.azure.core.implementation.util.FluxUtil; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.common.Constants; +import com.azure.storage.common.StorageInputStream; +import com.azure.storage.file.models.FileRange; +import com.azure.storage.file.models.StorageException; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Provides an input stream to read a given storage file resource. + */ +public class StorageFileInputStream extends StorageInputStream { + final ClientLogger logger = new ClientLogger(StorageFileInputStream.class); + + private final FileAsyncClient fileAsyncClient; + + /** + * Initializes a new instance of the StorageFileInputStream class. + * + * @param fileAsyncClient A {@link FileClient} object which represents the blob that this stream is associated with. + * @throws StorageException An exception representing any error which occurred during the operation. + */ + StorageFileInputStream(final FileAsyncClient fileAsyncClient) + throws StorageException { + this(fileAsyncClient, 0, null); + } + + /** + * Initializes a new instance of the StorageFileInputStream class. Note that if {@code fileRangeOffset} is not + * {@code 0} or {@code fileRangeLength} is not {@code null}, there will be no content MD5 verification. + * + * @param fileAsyncClient A {@link FileAsyncClient} object which represents the blob + * that this stream is associated with. + * @param fileRangeOffset The offset of file range data to begin stream. + * @param fileRangeLength How much data the stream should return after fileRangeOffset. + * @throws StorageException An exception representing any error which occurred during the operation. + */ + StorageFileInputStream(final FileAsyncClient fileAsyncClient, long fileRangeOffset, Long fileRangeLength) + throws StorageException { + super(fileRangeOffset, fileRangeLength, 4 * Constants.MB, + fileAsyncClient.getProperties().block().getContentLength()); + this.fileAsyncClient = fileAsyncClient; + } + + /** + * Dispatches a read operation of N bytes. + * + * @param readLength An int which represents the number of bytes to read. + */ + @Override + protected synchronized ByteBuffer dispatchRead(final int readLength, final long offset) { + try { + ByteBuffer currentBuffer = this.fileAsyncClient.downloadWithPropertiesWithResponse(new FileRange(offset, + offset + readLength - 1), false) + .flatMap(response -> FluxUtil.collectBytesInByteBufferStream(response.getValue().getBody()) + .map(ByteBuffer::wrap)) + .block(); + + this.bufferSize = readLength; + this.bufferStartOffset = offset; + return currentBuffer; + } catch (final StorageException e) { + this.streamFaulted = true; + this.lastError = new IOException(e); + throw logger.logExceptionAsError(new RuntimeException(this.lastError.getMessage())); + } + } +} diff --git a/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/StorageFileOutputStream.java b/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/StorageFileOutputStream.java new file mode 100644 index 000000000000..9dcf9706e30e --- /dev/null +++ b/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/StorageFileOutputStream.java @@ -0,0 +1,49 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.file; + +import com.azure.storage.common.Constants; +import com.azure.storage.common.StorageOutputStream; +import com.azure.storage.file.models.StorageException; +import java.io.IOException; +import java.nio.ByteBuffer; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class StorageFileOutputStream extends StorageOutputStream { + private long offsetPos; + + private final FileAsyncClient client; + + StorageFileOutputStream(final FileAsyncClient client, long offsetPos) { + super(4 * Constants.MB); + this.client = client; + this.offsetPos = offsetPos; + } + + private Mono uploadData(Flux inputData, long writeLength, long offset) { + return client.uploadWithResponse(inputData, writeLength, offset) + .then() + .onErrorResume(t -> t instanceof IOException || t instanceof StorageException, e -> { + this.lastError = new IOException(e); + return null; + }); + } + + @Override + protected Mono dispatchWrite(byte[] data, int writeLength, long offset) { + if (writeLength == 0) { + return Mono.empty(); + } + + Flux fbb = Flux.range(0, 1) + .concatMap(pos -> Mono.fromCallable(() -> ByteBuffer.wrap(data, (int) offset, writeLength))); + + long fileOffset = this.offsetPos; + this.offsetPos = this.offsetPos + writeLength; + + return this.uploadData(fbb.subscribeOn(Schedulers.elastic()), writeLength, fileOffset); + } +} diff --git a/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/models/FileRange.java b/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/models/FileRange.java index 4d8fa259796a..9d60485acf3c 100644 --- a/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/models/FileRange.java +++ b/sdk/storage/azure-storage-file/src/main/java/com/azure/storage/file/models/FileRange.java @@ -3,22 +3,44 @@ package com.azure.storage.file.models; -import java.util.Objects; +import com.azure.core.util.logging.ClientLogger; +import java.util.Locale; /** * The range of a file in the storage file service. */ public final class FileRange { + final ClientLogger logger = new ClientLogger(FileRange.class); + private static final String RANGE_HEADER_FORMAT = "bytes=%d-%d"; + private static final String BEGIN_RANGE_HEADER_FORMAT = "bytes=%d-"; private final long start; private final Long end; + /** + * Create an instance of the range of a file. Specify the start the range + * and the end defaults to the length of the file. + * @param start Specifies the start of bytes to be written. + */ + public FileRange(final long start) { + this(start, null); + } + /** * Create an instance of the range of a file. Both the start and end of the range must be specified. * @param start Specifies the start of bytes to be written. * @param end Specifies the end of bytes to be written */ public FileRange(final long start, final Long end) { + if (start < 0) { + throw logger.logExceptionAsError(new IllegalArgumentException( + "FileRange offset must be greater than or equal to 0.")); + } this.start = start; + + if (end != null && end < 0) { + throw logger.logExceptionAsError(new IllegalArgumentException(new IllegalArgumentException( + "FileRange end must be greater than or equal to 0 if specified."))); + } this.end = end; } @@ -51,7 +73,22 @@ public Long getEnd() { */ @Override public String toString() { - String endString = Objects.toString(end); - return "bytes=" + String.valueOf(start) + "-" + endString; + if (this.end != null) { + return String.format(Locale.ROOT, RANGE_HEADER_FORMAT, this.start, this.end); + } + + return String.format(Locale.ROOT, BEGIN_RANGE_HEADER_FORMAT, this.start); + } + + /** + * @return {@link FileRange#toString()} if {@code count} isn't {@code null} or {@code offset} isn't 0, otherwise + * null. + */ + public String toHeaderValue() { + // The default values of a BlobRange + if (this.start == 0 && this.end == null) { + return null; + } + return this.toString(); } } diff --git a/sdk/storage/azure-storage-file/src/test/java/com/azure/storage/file/APISpec.groovy b/sdk/storage/azure-storage-file/src/test/java/com/azure/storage/file/APISpec.groovy index 9f7ef550b3aa..0707a8630f06 100644 --- a/sdk/storage/azure-storage-file/src/test/java/com/azure/storage/file/APISpec.groovy +++ b/sdk/storage/azure-storage-file/src/test/java/com/azure/storage/file/APISpec.groovy @@ -18,6 +18,7 @@ import com.azure.storage.file.FileServiceClient import com.azure.storage.file.FileServiceClientBuilder import com.azure.storage.file.ShareClientBuilder import com.azure.storage.file.models.ListSharesOptions +import spock.lang.Shared import spock.lang.Specification import java.time.Duration @@ -25,8 +26,8 @@ import java.time.OffsetDateTime class APISpec extends Specification { // Field common used for all APIs. - def logger = new ClientLogger(APISpec.class) - def AZURE_TEST_MODE = "AZURE_TEST_MODE" + static ClientLogger logger = new ClientLogger(APISpec.class) + static def AZURE_TEST_MODE = "AZURE_TEST_MODE" def tmpFolder = getClass().getClassLoader().getResource("tmptestfiles") def testFolder = getClass().getClassLoader().getResource("testfiles") InterceptorManager interceptorManager @@ -39,7 +40,8 @@ class APISpec extends Specification { // Test name for test method name. def methodName - def testMode = getTestMode() + + static def testMode = getTestMode() String connectionString // If debugging is enabled, recordings cannot run as there can only be one proxy at a time. @@ -89,7 +91,7 @@ class APISpec extends Specification { *

  • Playback: (default if no test mode setup)
  • * */ - def getTestMode() { + static def getTestMode() { def azureTestMode = Configuration.getGlobalConfiguration().get(AZURE_TEST_MODE) if (azureTestMode != null) { @@ -105,6 +107,10 @@ class APISpec extends Specification { return TestMode.PLAYBACK } + static boolean liveMode() { + return testMode == TestMode.RECORD + } + def fileServiceBuilderHelper(final InterceptorManager interceptorManager) { if (testMode == TestMode.RECORD) { return new FileServiceClientBuilder() diff --git a/sdk/storage/azure-storage-file/src/test/java/com/azure/storage/file/StorageFileInputOutputStreamTests.groovy b/sdk/storage/azure-storage-file/src/test/java/com/azure/storage/file/StorageFileInputOutputStreamTests.groovy new file mode 100644 index 000000000000..40d02762f721 --- /dev/null +++ b/sdk/storage/azure-storage-file/src/test/java/com/azure/storage/file/StorageFileInputOutputStreamTests.groovy @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.file + +import com.azure.storage.common.Constants +import com.azure.storage.file.APISpec +import com.azure.storage.file.FileTestHelper +import com.azure.storage.file.StorageFileInputStream +import com.azure.storage.file.StorageFileOutputStream +import spock.lang.Requires + +class StorageFileInputOutputStreamTests extends APISpec { + def fileClient + int length + + def setup() { + def shareName = testResourceName.randomName(methodName, 60) + def filePath = testResourceName.randomName(methodName, 60) + def shareClient = shareBuilderHelper(interceptorManager, shareName).buildClient() + shareClient.create() + fileClient = shareClient.getFileClient(filePath) + } + + @Requires({ APISpec.liveMode() }) + def "Upload download"() { + when: + length = 30 * Constants.MB + fileClient.create(length) + byte[] randomBytes = FileTestHelper.getRandomBuffer(length) + + StorageFileOutputStream outStream = fileClient.getFileOutputStream() + outStream.write(randomBytes) + outStream.close() + + then: + StorageFileInputStream inputStream = fileClient.openInputStream() + int b + ByteArrayOutputStream outputStream = new ByteArrayOutputStream() + try { + while ((b = inputStream.read()) != -1){ + outputStream.write(b) + } + } catch (IOException ex) { + throw new UncheckedIOException(ex) + } + byte[] randomBytes2 = outputStream.toByteArray() + assert randomBytes2 == randomBytes + } + + + @Requires({ APISpec.liveMode() }) + def "Stream with offset"() { + when: + length = 7 * Constants.MB + fileClient.create(length) + byte[] randomBytes = FileTestHelper.getRandomBuffer(9 * Constants.MB) + + StorageFileOutputStream outStream = fileClient.getFileOutputStream() + outStream.write(randomBytes, 2 * Constants.MB, length) + outStream.close() + + then: + StorageFileInputStream inputStream = fileClient.openInputStream() + byte[] b = new byte[length] + ByteArrayOutputStream outputStream = new ByteArrayOutputStream() + try { + if(inputStream.read(b) != -1){ + outputStream.write(b, 0, b.length) + } + } catch (IOException ex) { + throw new UncheckedIOException(ex) + } + byte[] randomBytes2 = outputStream.toByteArray() + assert randomBytes2 == Arrays.copyOfRange(randomBytes, 2 * Constants.MB, 9 * Constants.MB) + } +} diff --git a/sdk/storage/azure-storage-file/src/test/resources/session-records/StorageFileInputOutputStreamTestsStreamOverLimits.json b/sdk/storage/azure-storage-file/src/test/resources/session-records/StorageFileInputOutputStreamTestsStreamOverLimits.json new file mode 100644 index 000000000000..da192c8dc5d2 --- /dev/null +++ b/sdk/storage/azure-storage-file/src/test/resources/session-records/StorageFileInputOutputStreamTestsStreamOverLimits.json @@ -0,0 +1,54 @@ +{ + "networkCallRecords" : [ { + "Method" : "PUT", + "Uri" : "https://sima.file.core.windows.net/storagefileinputoutputstreamtestsstreamoverlimits65105a0a?restype=share", + "Headers" : { + "x-ms-version" : "2019-02-02", + "User-Agent" : "azsdk-java-azure-storage-file/12.0.0-preview.4 1.8.0_221; Windows 10 10.0", + "x-ms-client-request-id" : "025c4120-d52b-45db-a7ec-a71b701364ad" + }, + "Response" : { + "x-ms-version" : "2019-02-02", + "Server" : "Windows-Azure-File/1.0 Microsoft-HTTPAPI/2.0", + "ETag" : "\"0x8D74071C73C5E31\"", + "Last-Modified" : "Mon, 23 Sep 2019 22:02:51 GMT", + "retry-after" : "0", + "Content-Length" : "0", + "StatusCode" : "201", + "x-ms-request-id" : "34bf2060-901a-0065-6d5a-727ecf000000", + "Date" : "Mon, 23 Sep 2019 22:02:50 GMT", + "x-ms-client-request-id" : "025c4120-d52b-45db-a7ec-a71b701364ad" + }, + "Exception" : null + }, { + "Method" : "PUT", + "Uri" : "https://sima.file.core.windows.net/storagefileinputoutputstreamtestsstreamoverlimits65105a0a/storagefileinputoutputstreamtestsstreamoverlimits4065214f", + "Headers" : { + "x-ms-version" : "2019-02-02", + "User-Agent" : "azsdk-java-azure-storage-file/12.0.0-preview.4 1.8.0_221; Windows 10 10.0", + "x-ms-client-request-id" : "89995268-fd27-491b-8483-6f2119f1d302" + }, + "Response" : { + "x-ms-version" : "2019-02-02", + "x-ms-file-permission-key" : "9147088710610530974*15442866184092965392", + "x-ms-file-id" : "13835128424026341376", + "Server" : "Windows-Azure-File/1.0 Microsoft-HTTPAPI/2.0", + "x-ms-file-creation-time" : "2019-09-23T22:02:52.0255146Z", + "Last-Modified" : "Mon, 23 Sep 2019 22:02:52 GMT", + "retry-after" : "0", + "StatusCode" : "201", + "x-ms-request-server-encrypted" : "true", + "Date" : "Mon, 23 Sep 2019 22:02:51 GMT", + "ETag" : "\"0x8D74071C796F2AA\"", + "x-ms-file-attributes" : "Archive", + "x-ms-file-change-time" : "2019-09-23T22:02:52.0255146Z", + "x-ms-file-parent-id" : "0", + "Content-Length" : "0", + "x-ms-request-id" : "34bf2063-901a-0065-6e5a-727ecf000000", + "x-ms-client-request-id" : "89995268-fd27-491b-8483-6f2119f1d302", + "x-ms-file-last-write-time" : "2019-09-23T22:02:52.0255146Z" + }, + "Exception" : null + } ], + "variables" : [ "storagefileinputoutputstreamtestsstreamoverlimits65105a0a", "storagefileinputoutputstreamtestsstreamoverlimits4065214f" ] +} \ No newline at end of file