diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java index ac109b8afb7d..e44d844f2f45 100644 --- a/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java +++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java @@ -225,9 +225,11 @@ public static void runTests(PerfStressTest[] tests, boolean sync, int paralle } } catch (InterruptedException | ExecutionException e) { System.err.println("Error occurred when submitting jobs to ForkJoinPool. " + System.lineSeparator() + e); + e.printStackTrace(System.err); throw new RuntimeException(e); } catch (Exception e) { System.err.println("Error occurred running tests: " + System.lineSeparator() + e); + e.printStackTrace(System.err); } finally { progressStatus.dispose(); } diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/RepeatingInputStream.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/RepeatingInputStream.java index 1c102de65ff1..6ed427565fd0 100644 --- a/common/perf-test-core/src/main/java/com/azure/perf/test/core/RepeatingInputStream.java +++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/RepeatingInputStream.java @@ -10,13 +10,14 @@ * Represents a repeating input stream with mark support enabled. */ public class RepeatingInputStream extends InputStream { - private static final int RANDOM_BYTES_LENGTH = 1024 * 1024; // 1MB + private static final int RANDOM_BYTES_LENGTH = Integer.parseInt( + System.getProperty("azure.core.perf.test.data.buffer.size", "1048576")); // 1MB default; private static final byte[] RANDOM_BYTES; - private final int size; + private final long size; - private int mark = 0; - private int readLimit = Integer.MAX_VALUE; - private int pos = 0; + private long mark = 0; + private long readLimit = Long.MAX_VALUE; + private long pos = 0; static { Random random = new Random(0); @@ -28,13 +29,13 @@ public class RepeatingInputStream extends InputStream { * Creates an Instance of the repeating input stream. * @param size the size of the stream. */ - public RepeatingInputStream(int size) { + public RepeatingInputStream(long size) { this.size = size; } @Override public synchronized int read() { - return (pos < size) ? (RANDOM_BYTES[pos++ % RANDOM_BYTES_LENGTH] & 0xFF) : -1; + return (pos < size) ? (RANDOM_BYTES[(int) (pos++ % RANDOM_BYTES_LENGTH)] & 0xFF) : -1; } @Override @@ -61,6 +62,15 @@ public synchronized void mark(int readLimit) { this.mark = this.pos; } + /** + * Same as {@link #mark(int)} but takes long. + * @param readLimit read limit. + */ + public synchronized void mark(long readLimit) { + this.readLimit = readLimit; + this.mark = this.pos; + } + @Override public boolean markSupported() { return true; diff --git a/common/perf-test-core/src/main/java/com/azure/perf/test/core/TestDataCreationHelper.java b/common/perf-test-core/src/main/java/com/azure/perf/test/core/TestDataCreationHelper.java index d22e5006b63e..a004ce93be22 100644 --- a/common/perf-test-core/src/main/java/com/azure/perf/test/core/TestDataCreationHelper.java +++ b/common/perf-test-core/src/main/java/com/azure/perf/test/core/TestDataCreationHelper.java @@ -16,9 +16,9 @@ * Utility class to help with data creation for perf testing. */ public class TestDataCreationHelper { - private static final int RANDOM_BYTES_LENGTH = 1024 * 1024; // 1MB + private static final int RANDOM_BYTES_LENGTH = Integer.parseInt( + System.getProperty("azure.core.perf.test.data.buffer.size", "1048576")); // 1MB default; private static final byte[] RANDOM_BYTES; - private static final int SIZE = (1024 * 1024 * 1024) + 1; static { Random random = new Random(0); @@ -34,10 +34,10 @@ public class TestDataCreationHelper { * @return The created {@link Flux} */ private static Flux createCircularByteBufferFlux(byte[] array, long size) { - int quotient = (int) size / array.length; - int remainder = (int) size % array.length; + long quotient = size / array.length; + int remainder = (int) (size % array.length); - return Flux.range(0, quotient) + return Flux.just(Boolean.TRUE).repeat(quotient - 1) .map(i -> allocateByteBuffer(array, array.length)) .concatWithValues(allocateByteBuffer(array, remainder)); } @@ -66,14 +66,9 @@ public static Flux createRandomByteBufferFlux(long size) { * * @param size the size of the stream * @return the {@link InputStream} of {@code size} - * @throws IllegalArgumentException if {@code size} is more than {@link #SIZE} */ public static InputStream createRandomInputStream(long size) { - if (size > SIZE) { - throw new IllegalArgumentException("size must be <= " + SIZE); - } - - return new RepeatingInputStream((int) size); + return new RepeatingInputStream(size); } /** @@ -84,10 +79,10 @@ public static InputStream createRandomInputStream(long size) { * @throws IOException If an IO error occurs. */ public static void writeBytesToOutputStream(OutputStream outputStream, long size) throws IOException { - int quotient = (int) size / RANDOM_BYTES.length; - int remainder = (int) size % RANDOM_BYTES.length; + long quotient = size / RANDOM_BYTES.length; + int remainder = (int) (size % RANDOM_BYTES.length); - for (int i = 0; i < quotient; i++) { + for (long i = 0; i < quotient; i++) { outputStream.write(RANDOM_BYTES); } diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml index 2c6af81d7e59..2da2d12d64ac 100755 --- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml +++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml @@ -735,6 +735,8 @@ + + diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java index d437b8005045..11dc06fe25b4 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java @@ -572,12 +572,14 @@ buffers is not a common scenario for async like it is in sync (and we already bu // no specified length: use azure.core's converter if (data == null && options.getOptionalLength() == null) { // We can only buffer up to max int due to restrictions in ByteBuffer. - int chunkSize = (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong()); + int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH, + parallelTransferOptions.getBlockSizeLong()); data = FluxUtil.toFluxByteBuffer(options.getDataStream(), chunkSize); // specified length (legacy requirement): use custom converter. no marking because we buffer anyway. } else if (data == null) { // We can only buffer up to max int due to restrictions in ByteBuffer. - int chunkSize = (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong()); + int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH, + parallelTransferOptions.getBlockSizeLong()); data = Utility.convertStreamToByteBuffer( options.getDataStream(), options.getOptionalLength(), chunkSize, false); } diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/Constants.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/Constants.java index 87a89e5a525d..86d75c443ade 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/Constants.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/Constants.java @@ -71,6 +71,16 @@ public final class Constants { */ public static final int BUFFER_COPY_LENGTH = 8 * KB; + /** + * This constant is used to cap Stream->Flux converter's block size considering that: + * - Integer.MAX (or near) leads to java.lang.OutOfMemoryError: Requested array size exceeds VM limit + * - Allocating arrays that are very large can be less successful on busy heap and put extra pressure on GC to + * de-fragment. + * - Going to small on the other hand might be harmful to large upload scenarios. Max block size is 4000MB + * so chunking that into blocks that are smaller produces a lot of garbage to just wrap this into ByteBuffers. + */ + public static final int MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH = 64 * MB; + public static final String STORAGE_SCOPE = "https://storage.azure.com/.default"; public static final String STORAGE_LOG_STRING_TO_SIGN = "Azure-Storage-Log-String-To-Sign"; diff --git a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java index 60d88007e040..b029e56a08a5 100644 --- a/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java +++ b/sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java @@ -329,12 +329,14 @@ public Mono> uploadWithResponse(FileParallelUploadOptions opt // no specified length: use azure.core's converter if (data == null && options.getOptionalLength() == null) { // We can only buffer up to max int due to restrictions in ByteBuffer. - int chunkSize = (int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong()); + int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH, + validatedParallelTransferOptions.getBlockSizeLong()); data = FluxUtil.toFluxByteBuffer(options.getDataStream(), chunkSize); // specified length (legacy requirement): use custom converter. no marking because we buffer anyway. } else if (data == null) { // We can only buffer up to max int due to restrictions in ByteBuffer. - int chunkSize = (int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong()); + int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH, + validatedParallelTransferOptions.getBlockSizeLong()); data = Utility.convertStreamToByteBuffer( options.getDataStream(), options.getOptionalLength(), chunkSize, false); } diff --git a/sdk/storage/azure-storage-file-share/src/main/java/com/azure/storage/file/share/ShareFileAsyncClient.java b/sdk/storage/azure-storage-file-share/src/main/java/com/azure/storage/file/share/ShareFileAsyncClient.java index 3ea32684dd6f..6758c15fa52a 100644 --- a/sdk/storage/azure-storage-file-share/src/main/java/com/azure/storage/file/share/ShareFileAsyncClient.java +++ b/sdk/storage/azure-storage-file-share/src/main/java/com/azure/storage/file/share/ShareFileAsyncClient.java @@ -1513,12 +1513,14 @@ Mono> uploadWithResponse(ShareFileUploadOptions op // no specified length: use azure.core's converter if (data == null && options.getLength() == null) { // We can only buffer up to max int due to restrictions in ByteBuffer. - int chunkSize = (int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong()); + int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH, + validatedParallelTransferOptions.getBlockSizeLong()); data = FluxUtil.toFluxByteBuffer(options.getDataStream(), chunkSize); // specified length (legacy requirement): use custom converter. no marking because we buffer anyway. } else if (data == null) { // We can only buffer up to max int due to restrictions in ByteBuffer. - int chunkSize = (int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong()); + int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH, + validatedParallelTransferOptions.getBlockSizeLong()); data = Utility.convertStreamToByteBuffer( options.getDataStream(), options.getLength(), chunkSize, false); } diff --git a/sdk/storage/azure-storage-perf/memory-stress-scenarios.ps1 b/sdk/storage/azure-storage-perf/memory-stress-scenarios.ps1 index f6f89b3da357..caf6621cbac1 100644 --- a/sdk/storage/azure-storage-perf/memory-stress-scenarios.ps1 +++ b/sdk/storage/azure-storage-perf/memory-stress-scenarios.ps1 @@ -9,10 +9,12 @@ function Run-Scenario { [Parameter(Mandatory=$true, Position=0)] [string] $HeapSize, [Parameter(Mandatory=$true, Position=1)] - [string] $Scenario + [string] $Scenario, + [Parameter(Mandatory=$false, Position=2)] + [string] $ExtraFlags ) Write-Host "Executing '$Scenario' with '$HeapSize' heap" - Invoke-Expression "& '$JavaPath' -Xms$HeapSize -Xmx$HeapSize -jar '$PerfJarPath' $Scenario" + Invoke-Expression "& '$JavaPath' -Xms$HeapSize -Xmx$HeapSize '$ExtraFlags' -jar '$PerfJarPath' $Scenario" if ($LASTEXITCODE -ne 0) { Write-Host "Scenario failed, exiting" exit 1 @@ -31,6 +33,11 @@ Run-Scenario "50m" "uploadoutputstream --warmup 0 --duration 1 --size 1048576000 Run-Scenario "50m" "uploadblobnolength --warmup 0 --duration 1 --size 1048576000 --transfer-single-upload-size 4194304 --transfer-block-size 1048576 --sync" Run-Scenario "50m" "uploadblob --warmup 0 --duration 1 --size 1048576000 --transfer-single-upload-size 4194304 --transfer-block-size 1048576 --sync" Run-Scenario "50m" "uploadblob --warmup 0 --duration 1 --size 1048576000 --transfer-single-upload-size 4194304 --transfer-block-size 1048576" +# Jumbo blobs +Run-Scenario "7g" "uploadblob --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1" "-Dazure.core.perf.test.data.buffer.size=104857600" +Run-Scenario "7g" "uploadblob --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1 --sync" "-Dazure.core.perf.test.data.buffer.size=104857600" +Run-Scenario "7g" "uploadblobnolength --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1 --sync" "-Dazure.core.perf.test.data.buffer.size=104857600" +Run-Scenario "7g" "uploadoutputstream --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1 --sync" "-Dazure.core.perf.test.data.buffer.size=104857600" # DataLake $env:STORAGE_CONNECTION_STRING=$env:STORAGE_DATA_LAKE_CONNECTION_STRING @@ -40,3 +47,11 @@ Run-Scenario "300m" "uploadfiledatalake --warmup 0 --duration 1 --size 104857600 # Small transfer options Run-Scenario "50m" "uploadfiledatalake --warmup 0 --duration 1 --size 1048576000 --transfer-single-upload-size 4194304 --transfer-block-size 1048576 --sync" Run-Scenario "50m" "uploadfiledatalake --warmup 0 --duration 1 --size 1048576000 --transfer-single-upload-size 4194304 --transfer-block-size 1048576" +# Jumbo blobs +Run-Scenario "7g" "uploadfiledatalake --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1 --sync" "-Dazure.core.perf.test.data.buffer.size=104857600" +Run-Scenario "7g" "uploadfiledatalake --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1" "-Dazure.core.perf.test.data.buffer.size=104857600" + +# Shares +$env:STORAGE_CONNECTION_STRING=$env:PRIMARY_STORAGE_CONNECTION_STRING +Run-Scenario "100m" "uploadfileshare --warmup 0 --duration 1 --size 1048576000 --sync" +Run-Scenario "200m" "uploadfileshare --warmup 0 --duration 1 --size 1048576000" diff --git a/sdk/storage/azure-storage-perf/src/main/java/com/azure/storage/blob/perf/UploadBlobNoLengthTest.java b/sdk/storage/azure-storage-perf/src/main/java/com/azure/storage/blob/perf/UploadBlobNoLengthTest.java index 77f42e40085f..c0833ef984b8 100644 --- a/sdk/storage/azure-storage-perf/src/main/java/com/azure/storage/blob/perf/UploadBlobNoLengthTest.java +++ b/sdk/storage/azure-storage-perf/src/main/java/com/azure/storage/blob/perf/UploadBlobNoLengthTest.java @@ -22,9 +22,14 @@ public class UploadBlobNoLengthTest extends BlobTestBase { public UploadBlobTest(StoragePerfStressOptions options) { super(options); - inputStream = (RepeatingInputStream) createRandomInputStream(options.getSize()); - inputStream.mark(Integer.MAX_VALUE); - byteBufferFlux = createRandomByteBufferFlux(options.getSize()); + if (options.isSync()) { + inputStream = (RepeatingInputStream) createRandomInputStream(options.getSize()); + inputStream.mark(Long.MAX_VALUE); + byteBufferFlux = null; + } else { + inputStream = null; + byteBufferFlux = createRandomByteBufferFlux(options.getSize()); + } } @Override diff --git a/sdk/storage/azure-storage-perf/src/main/java/com/azure/storage/file/datalake/perf/UploadFileDatalakeTest.java b/sdk/storage/azure-storage-perf/src/main/java/com/azure/storage/file/datalake/perf/UploadFileDatalakeTest.java index 7391efb5ff3a..40f9203ecda8 100644 --- a/sdk/storage/azure-storage-perf/src/main/java/com/azure/storage/file/datalake/perf/UploadFileDatalakeTest.java +++ b/sdk/storage/azure-storage-perf/src/main/java/com/azure/storage/file/datalake/perf/UploadFileDatalakeTest.java @@ -22,8 +22,13 @@ public class UploadFileDatalakeTest extends FileTestBase { +public class UploadFileShareTest extends FileTestBase { protected final RepeatingInputStream inputStream; protected final Flux byteBufferFlux; - public UploadFileShareTest(PerfStressOptions options) { + public UploadFileShareTest(StoragePerfStressOptions options) { super(options); inputStream = (RepeatingInputStream) TestDataCreationHelper.createRandomInputStream(options.getSize()); byteBufferFlux = createRandomByteBufferFlux(options.getSize()); @@ -27,12 +28,20 @@ public UploadFileShareTest(PerfStressOptions options) { @Override public void run() { inputStream.reset(); - shareFileClient.upload(inputStream, options.getSize()); + ParallelTransferOptions transferOptions = new ParallelTransferOptions() + .setMaxSingleUploadSizeLong(options.getTransferSingleUploadSize()) + .setBlockSizeLong(options.getTransferBlockSize()) + .setMaxConcurrency(options.getTransferConcurrency()); + shareFileClient.upload(inputStream, options.getSize(), transferOptions); } @Override public Mono runAsync() { - return shareFileAsyncClient.upload(byteBufferFlux, options.getSize()) + ParallelTransferOptions transferOptions = new ParallelTransferOptions() + .setMaxSingleUploadSizeLong(options.getTransferSingleUploadSize()) + .setBlockSizeLong(options.getTransferBlockSize()) + .setMaxConcurrency(options.getTransferConcurrency()); + return shareFileAsyncClient.upload(byteBufferFlux, transferOptions) .then(); } diff --git a/sdk/storage/tests.yml b/sdk/storage/tests.yml index 9a9a3b15f808..a660a0106a72 100644 --- a/sdk/storage/tests.yml +++ b/sdk/storage/tests.yml @@ -72,7 +72,7 @@ stages: JAVA_HOME: $(JAVA_HOME_11_X64) ${{ if eq(variables['JavaTestVersion'], '1.8') }}: JAVA_HOME: $(JAVA_HOME_8_X64) - condition: and(succeeded(), not(contains(variables['OSVmImage'], 'mac'))) + condition: and(succeeded(), contains(variables['OSVmImage'], 'ubuntu')) - pwsh: | New-Item $(Build.ArtifactStagingDirectory)/test-logs -ItemType directory Copy-Item sdk/storage/azure-storage-blob/target/test.log $(Build.ArtifactStagingDirectory)/test-logs/azure-storage-blob-test.log -ErrorAction SilentlyContinue