Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,11 @@ public static void runTests(PerfStressTest<?>[] tests, boolean sync, int paralle
}
} catch (InterruptedException | ExecutionException e) {
System.err.println("Error occurred when submitting jobs to ForkJoinPool. " + System.lineSeparator() + e);
e.printStackTrace(System.err);
throw new RuntimeException(e);
} catch (Exception e) {
System.err.println("Error occurred running tests: " + System.lineSeparator() + e);
e.printStackTrace(System.err);
} finally {
progressStatus.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
* Represents a repeating input stream with mark support enabled.
*/
public class RepeatingInputStream extends InputStream {
private static final int RANDOM_BYTES_LENGTH = 1024 * 1024; // 1MB
private static final int RANDOM_BYTES_LENGTH = Integer.parseInt(
System.getProperty("azure.core.perf.test.data.buffer.size", "1048576")); // 1MB default;
private static final byte[] RANDOM_BYTES;
private final int size;
private final long size;

private int mark = 0;
private int readLimit = Integer.MAX_VALUE;
private int pos = 0;
private long mark = 0;
private long readLimit = Long.MAX_VALUE;
private long pos = 0;

static {
Random random = new Random(0);
Expand All @@ -28,13 +29,13 @@ public class RepeatingInputStream extends InputStream {
* Creates an Instance of the repeating input stream.
* @param size the size of the stream.
*/
public RepeatingInputStream(int size) {
public RepeatingInputStream(long size) {
this.size = size;
}

@Override
public synchronized int read() {
return (pos < size) ? (RANDOM_BYTES[pos++ % RANDOM_BYTES_LENGTH] & 0xFF) : -1;
return (pos < size) ? (RANDOM_BYTES[(int) (pos++ % RANDOM_BYTES_LENGTH)] & 0xFF) : -1;
}

@Override
Expand All @@ -61,6 +62,15 @@ public synchronized void mark(int readLimit) {
this.mark = this.pos;
}

/**
* Same as {@link #mark(int)} but takes long.
* @param readLimit read limit.
*/
public synchronized void mark(long readLimit) {
this.readLimit = readLimit;
this.mark = this.pos;
}

@Override
public boolean markSupported() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
* Utility class to help with data creation for perf testing.
*/
public class TestDataCreationHelper {
private static final int RANDOM_BYTES_LENGTH = 1024 * 1024; // 1MB
private static final int RANDOM_BYTES_LENGTH = Integer.parseInt(
System.getProperty("azure.core.perf.test.data.buffer.size", "1048576")); // 1MB default;
private static final byte[] RANDOM_BYTES;
private static final int SIZE = (1024 * 1024 * 1024) + 1;

static {
Random random = new Random(0);
Expand All @@ -34,10 +34,10 @@ public class TestDataCreationHelper {
* @return The created {@link Flux}
*/
private static Flux<ByteBuffer> createCircularByteBufferFlux(byte[] array, long size) {
int quotient = (int) size / array.length;
int remainder = (int) size % array.length;
long quotient = size / array.length;
int remainder = (int) (size % array.length);

return Flux.range(0, quotient)
return Flux.just(Boolean.TRUE).repeat(quotient - 1)
.map(i -> allocateByteBuffer(array, array.length))
.concatWithValues(allocateByteBuffer(array, remainder));
}
Expand Down Expand Up @@ -66,14 +66,9 @@ public static Flux<ByteBuffer> createRandomByteBufferFlux(long size) {
*
* @param size the size of the stream
* @return the {@link InputStream} of {@code size}
* @throws IllegalArgumentException if {@code size} is more than {@link #SIZE}
*/
public static InputStream createRandomInputStream(long size) {
if (size > SIZE) {
throw new IllegalArgumentException("size must be <= " + SIZE);
}

return new RepeatingInputStream((int) size);
return new RepeatingInputStream(size);
}

/**
Expand All @@ -84,10 +79,10 @@ public static InputStream createRandomInputStream(long size) {
* @throws IOException If an IO error occurs.
*/
public static void writeBytesToOutputStream(OutputStream outputStream, long size) throws IOException {
int quotient = (int) size / RANDOM_BYTES.length;
int remainder = (int) size % RANDOM_BYTES.length;
long quotient = size / RANDOM_BYTES.length;
int remainder = (int) (size % RANDOM_BYTES.length);

for (int i = 0; i < quotient; i++) {
for (long i = 0; i < quotient; i++) {
outputStream.write(RANDOM_BYTES);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,8 @@
<Package name="com.azure.storage.blob.perf.core"/>
<Package name="com.azure.storage.file.datalake.perf"/>
<Package name="com.azure.storage.file.datalake.perf.core"/>
<Package name="com.azure.storage.file.share.perf"/>
<Package name="com.azure.storage.file.share.perf.core"/>
</Or>
<Bug pattern="BC_UNCONFIRMED_CAST, DM_EXIT, RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"/>
</Match>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,12 +572,14 @@ buffers is not a common scenario for async like it is in sync (and we already bu
// no specified length: use azure.core's converter
if (data == null && options.getOptionalLength() == null) {
// We can only buffer up to max int due to restrictions in ByteBuffer.
int chunkSize = (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong());
int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH,
parallelTransferOptions.getBlockSizeLong());
data = FluxUtil.toFluxByteBuffer(options.getDataStream(), chunkSize);
// specified length (legacy requirement): use custom converter. no marking because we buffer anyway.
} else if (data == null) {
// We can only buffer up to max int due to restrictions in ByteBuffer.
int chunkSize = (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong());
int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH,
parallelTransferOptions.getBlockSizeLong());
data = Utility.convertStreamToByteBuffer(
options.getDataStream(), options.getOptionalLength(), chunkSize, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public final class Constants {
*/
public static final int BUFFER_COPY_LENGTH = 8 * KB;

/**
* This constant is used to cap Stream->Flux converter's block size considering that:
* - Integer.MAX (or near) leads to java.lang.OutOfMemoryError: Requested array size exceeds VM limit
* - Allocating arrays that are very large can be less successful on busy heap and put extra pressure on GC to
* de-fragment.
* - Going to small on the other hand might be harmful to large upload scenarios. Max block size is 4000MB
* so chunking that into blocks that are smaller produces a lot of garbage to just wrap this into ByteBuffers.
*/
public static final int MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH = 64 * MB;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we come to this value? Should we add some documentation explaining why we need to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We know that allocating Int.MAX leads to java.lang.OutOfMemoryError: Requested array size exceeds VM limit (there are plenty of articles about it).
  2. Allocating big arrays isn't great - they put more pressure on GC if heap is fragmented and it's lesser chance of allocating one.
  3. The value is guestimated based on few experiments. I.e. for jumbo blobs going to low means too much extra garbage to just wrap arrays into buffers.


public static final String STORAGE_SCOPE = "https://storage.azure.com/.default";

public static final String STORAGE_LOG_STRING_TO_SIGN = "Azure-Storage-Log-String-To-Sign";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,14 @@ public Mono<Response<PathInfo>> uploadWithResponse(FileParallelUploadOptions opt
// no specified length: use azure.core's converter
if (data == null && options.getOptionalLength() == null) {
// We can only buffer up to max int due to restrictions in ByteBuffer.
int chunkSize = (int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong());
int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH,
validatedParallelTransferOptions.getBlockSizeLong());
data = FluxUtil.toFluxByteBuffer(options.getDataStream(), chunkSize);
// specified length (legacy requirement): use custom converter. no marking because we buffer anyway.
} else if (data == null) {
// We can only buffer up to max int due to restrictions in ByteBuffer.
int chunkSize = (int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong());
int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH,
validatedParallelTransferOptions.getBlockSizeLong());
data = Utility.convertStreamToByteBuffer(
options.getDataStream(), options.getOptionalLength(), chunkSize, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1513,12 +1513,14 @@ Mono<Response<ShareFileUploadInfo>> uploadWithResponse(ShareFileUploadOptions op
// no specified length: use azure.core's converter
if (data == null && options.getLength() == null) {
// We can only buffer up to max int due to restrictions in ByteBuffer.
int chunkSize = (int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong());
int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH,
validatedParallelTransferOptions.getBlockSizeLong());
data = FluxUtil.toFluxByteBuffer(options.getDataStream(), chunkSize);
// specified length (legacy requirement): use custom converter. no marking because we buffer anyway.
} else if (data == null) {
// We can only buffer up to max int due to restrictions in ByteBuffer.
int chunkSize = (int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong());
int chunkSize = (int) Math.min(Constants.MAX_INPUT_STREAM_CONVERTER_BUFFER_LENGTH,
validatedParallelTransferOptions.getBlockSizeLong());
data = Utility.convertStreamToByteBuffer(
options.getDataStream(), options.getLength(), chunkSize, false);
}
Expand Down
19 changes: 17 additions & 2 deletions sdk/storage/azure-storage-perf/memory-stress-scenarios.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ function Run-Scenario {
[Parameter(Mandatory=$true, Position=0)]
[string] $HeapSize,
[Parameter(Mandatory=$true, Position=1)]
[string] $Scenario
[string] $Scenario,
[Parameter(Mandatory=$false, Position=2)]
[string] $ExtraFlags
)
Write-Host "Executing '$Scenario' with '$HeapSize' heap"
Invoke-Expression "& '$JavaPath' -Xms$HeapSize -Xmx$HeapSize -jar '$PerfJarPath' $Scenario"
Invoke-Expression "& '$JavaPath' -Xms$HeapSize -Xmx$HeapSize '$ExtraFlags' -jar '$PerfJarPath' $Scenario"
if ($LASTEXITCODE -ne 0) {
Write-Host "Scenario failed, exiting"
exit 1
Expand All @@ -31,6 +33,11 @@ Run-Scenario "50m" "uploadoutputstream --warmup 0 --duration 1 --size 1048576000
Run-Scenario "50m" "uploadblobnolength --warmup 0 --duration 1 --size 1048576000 --transfer-single-upload-size 4194304 --transfer-block-size 1048576 --sync"
Run-Scenario "50m" "uploadblob --warmup 0 --duration 1 --size 1048576000 --transfer-single-upload-size 4194304 --transfer-block-size 1048576 --sync"
Run-Scenario "50m" "uploadblob --warmup 0 --duration 1 --size 1048576000 --transfer-single-upload-size 4194304 --transfer-block-size 1048576"
# Jumbo blobs
Run-Scenario "7g" "uploadblob --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1" "-Dazure.core.perf.test.data.buffer.size=104857600"
Run-Scenario "7g" "uploadblob --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1 --sync" "-Dazure.core.perf.test.data.buffer.size=104857600"
Run-Scenario "7g" "uploadblobnolength --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1 --sync" "-Dazure.core.perf.test.data.buffer.size=104857600"
Run-Scenario "7g" "uploadoutputstream --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1 --sync" "-Dazure.core.perf.test.data.buffer.size=104857600"

# DataLake
$env:STORAGE_CONNECTION_STRING=$env:STORAGE_DATA_LAKE_CONNECTION_STRING
Expand All @@ -40,3 +47,11 @@ Run-Scenario "300m" "uploadfiledatalake --warmup 0 --duration 1 --size 104857600
# Small transfer options
Run-Scenario "50m" "uploadfiledatalake --warmup 0 --duration 1 --size 1048576000 --transfer-single-upload-size 4194304 --transfer-block-size 1048576 --sync"
Run-Scenario "50m" "uploadfiledatalake --warmup 0 --duration 1 --size 1048576000 --transfer-single-upload-size 4194304 --transfer-block-size 1048576"
# Jumbo blobs
Run-Scenario "7g" "uploadfiledatalake --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1 --sync" "-Dazure.core.perf.test.data.buffer.size=104857600"
Run-Scenario "7g" "uploadfiledatalake --warmup 0 --duration 1 --size 8388608000 --transfer-block-size 2147483648 --transfer-concurrency 1" "-Dazure.core.perf.test.data.buffer.size=104857600"

# Shares
$env:STORAGE_CONNECTION_STRING=$env:PRIMARY_STORAGE_CONNECTION_STRING
Run-Scenario "100m" "uploadfileshare --warmup 0 --duration 1 --size 1048576000 --sync"
Run-Scenario "200m" "uploadfileshare --warmup 0 --duration 1 --size 1048576000"
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ public class UploadBlobNoLengthTest extends BlobTestBase<StoragePerfStressOption

public UploadBlobNoLengthTest(StoragePerfStressOptions options) {
super(options);
inputStream = (RepeatingInputStream) createRandomInputStream(options.getSize());
inputStream.mark(Integer.MAX_VALUE);
byteBufferFlux = createRandomByteBufferFlux(options.getSize());
if (options.isSync()) {
inputStream = (RepeatingInputStream) createRandomInputStream(options.getSize());
inputStream.mark(Long.MAX_VALUE);
byteBufferFlux = null;
} else {
inputStream = null;
byteBufferFlux = createRandomByteBufferFlux(options.getSize());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ public class UploadBlobTest extends BlobTestBase<StoragePerfStressOptions> {

public UploadBlobTest(StoragePerfStressOptions options) {
super(options);
inputStream = (RepeatingInputStream) createRandomInputStream(options.getSize());
inputStream.mark(Integer.MAX_VALUE);
byteBufferFlux = createRandomByteBufferFlux(options.getSize());
if (options.isSync()) {
inputStream = (RepeatingInputStream) createRandomInputStream(options.getSize());
inputStream.mark(Long.MAX_VALUE);
byteBufferFlux = null;
} else {
inputStream = null;
byteBufferFlux = createRandomByteBufferFlux(options.getSize());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ public class UploadFileDatalakeTest extends FileTestBase<StoragePerfStressOption

public UploadFileDatalakeTest(StoragePerfStressOptions options) {
super(options);
inputStream = (RepeatingInputStream) TestDataCreationHelper.createRandomInputStream(options.getSize());
byteBufferFlux = createRandomByteBufferFlux(options.getSize());
if (options.isSync()) {
inputStream = (RepeatingInputStream) TestDataCreationHelper.createRandomInputStream(options.getSize());
byteBufferFlux = null;
} else {
byteBufferFlux = createRandomByteBufferFlux(options.getSize());
inputStream = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

package com.azure.storage.file.share.perf;

import com.azure.perf.test.core.PerfStressOptions;
import com.azure.perf.test.core.RepeatingInputStream;
import com.azure.perf.test.core.TestDataCreationHelper;
import com.azure.storage.StoragePerfStressOptions;
import com.azure.storage.common.ParallelTransferOptions;
import com.azure.storage.file.share.perf.core.FileTestBase;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -14,11 +15,11 @@

import static com.azure.perf.test.core.TestDataCreationHelper.createRandomByteBufferFlux;

public class UploadFileShareTest extends FileTestBase<PerfStressOptions> {
public class UploadFileShareTest extends FileTestBase<StoragePerfStressOptions> {
protected final RepeatingInputStream inputStream;
protected final Flux<ByteBuffer> byteBufferFlux;

public UploadFileShareTest(PerfStressOptions options) {
public UploadFileShareTest(StoragePerfStressOptions options) {
super(options);
inputStream = (RepeatingInputStream) TestDataCreationHelper.createRandomInputStream(options.getSize());
byteBufferFlux = createRandomByteBufferFlux(options.getSize());
Expand All @@ -27,12 +28,20 @@ public UploadFileShareTest(PerfStressOptions options) {
@Override
public void run() {
inputStream.reset();
shareFileClient.upload(inputStream, options.getSize());
ParallelTransferOptions transferOptions = new ParallelTransferOptions()
.setMaxSingleUploadSizeLong(options.getTransferSingleUploadSize())
.setBlockSizeLong(options.getTransferBlockSize())
.setMaxConcurrency(options.getTransferConcurrency());
shareFileClient.upload(inputStream, options.getSize(), transferOptions);
}

@Override
public Mono<Void> runAsync() {
return shareFileAsyncClient.upload(byteBufferFlux, options.getSize())
ParallelTransferOptions transferOptions = new ParallelTransferOptions()
.setMaxSingleUploadSizeLong(options.getTransferSingleUploadSize())
.setBlockSizeLong(options.getTransferBlockSize())
.setMaxConcurrency(options.getTransferConcurrency());
return shareFileAsyncClient.upload(byteBufferFlux, transferOptions)
.then();
}

Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ stages:
JAVA_HOME: $(JAVA_HOME_11_X64)
${{ if eq(variables['JavaTestVersion'], '1.8') }}:
JAVA_HOME: $(JAVA_HOME_8_X64)
condition: and(succeeded(), not(contains(variables['OSVmImage'], 'mac')))
condition: and(succeeded(), contains(variables['OSVmImage'], 'ubuntu'))
- pwsh: |
New-Item $(Build.ArtifactStagingDirectory)/test-logs -ItemType directory
Copy-Item sdk/storage/azure-storage-blob/target/test.log $(Build.ArtifactStagingDirectory)/test-logs/azure-storage-blob-test.log -ErrorAction SilentlyContinue
Expand Down