Skip to content

Commit

Permalink
Merge pull request Azure#380 from rickle-msft/dev
Browse files Browse the repository at this point in the history
Added support for progress reporting
  • Loading branch information
prjain-msft authored Oct 12, 2018
2 parents 5965a69 + c362ce7 commit 9a28f04
Show file tree
Hide file tree
Showing 13 changed files with 534 additions and 64 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ XXXX.XX.XX Version XX.X.X
* Added overloads which only accept the required parameters.
* TransferManager options now throw if an IProgressReceiver is passed as it is not currently supported. Support will be added in a later release.
* Added CopyFromURL, which will do a synchronous server-side copy, meaning the service will not return an HTTP response until it has completed the copy.
* Added support for IProgressReceiver in TransferManager operations. This parameter was previously ignored but is now supported.
* Removed internal dependency on javafx to be compatible with openjdk.

2018.09.11 Version 10.1.0
* Interfaces for helper types updated to be more consistent throughout the library. All types, with the exception of the options for pipeline factories, use a fluent pattern.
Expand Down
13 changes: 0 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,6 @@
</pluginRepository>
</pluginRepositories>

<repositories>
<repository>
<id>ossrh</id>
<name>Sonatype Snapshots</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<layout>default</layout>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>com.microsoft.rest.v2</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@
*/
package com.microsoft.azure.storage.blob;

import io.reactivex.Flowable;

/**
* An {@code IProgressReceiver} is an object that can be used to report progress on network transfers. When specified on
* transfer operations, the {@code reportProgress} method will be called periodically with the total number of bytes
* transferred. The user may configure this method to report progress in whatever format desired.
* transferred. The user may configure this method to report progress in whatever format desired. It is recommended
* that this type be used in conjunction with
* {@link ProgressReporter#addProgressReporting(Flowable, IProgressReceiver)}.
*/
public interface IProgressReceiver {

Expand Down
181 changes: 181 additions & 0 deletions src/main/java/com/microsoft/azure/storage/blob/ProgressReporter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright Microsoft Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.microsoft.azure.storage.blob;

import io.reactivex.Flowable;
import io.reactivex.Single;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;

/**
* {@code ProgressReporterImpl} offers a convenient way to add progress tracking to a given Flowable.
*/
public final class ProgressReporter {

private static abstract class ProgressReporterImpl implements IProgressReceiver{
long blockProgress;

final IProgressReceiver progressReceiver;

ProgressReporterImpl(IProgressReceiver progressReceiver) {
this.blockProgress = 0;
this.progressReceiver = progressReceiver;
}

@Override
public void reportProgress(long bytesTransferred) {
this.blockProgress += bytesTransferred;
}

void rewindProgress() {
this.blockProgress = 0;
}

Flowable<ByteBuffer> addProgressReporting(Flowable<ByteBuffer> data) {
return Single.just(this)
.flatMapPublisher(progressReporter -> {
/*
Each time there is a new subscription, we will rewind the progress. This is desirable specifically
for retries, which resubscribe on each try. The first time this flowable is subscribed to, the
rewind will be a noop as there will have been no progress made. Subsequent rewinds will work as
expected.
*/
progressReporter.rewindProgress();
/*
Every time we emit some data, report it to the Tracker, which will pass it on to the end user.
*/
return data.doOnNext(buffer ->
progressReporter.reportProgress(buffer.remaining()));
});
}
}

/**
* This type is used to keep track of the total amount of data transferred for a single request. This is the type
* we will use when the customer uses the factory to add progress reporting to their Flowable. We need this
* additional type because we can't keep local state directly as lambdas require captured local variables to be
* effectively final.
*/
private static class SequentialProgressReporter extends ProgressReporterImpl {
SequentialProgressReporter(IProgressReceiver progressReceiver) {
super(progressReceiver);
}

@Override
public void reportProgress(long bytesTransferred) {
super.reportProgress(bytesTransferred);
this.progressReceiver.reportProgress(this.blockProgress);
}
}

/**
* This type is used to keep track of the total amount of data transferred as a part of a parallel upload in order
* to coordinate progress reporting to the end user. We need this additional type because we can't keep local state
* directly as lambdas require captured local variables to be effectively final.
*/
private static class ParallelProgressReporter extends ProgressReporterImpl {
/*
This lock will be instantiated by the operation initiating the whole transfer to coordinate each
ProgressReporterImpl.
*/
private final Lock transferLock;

/*
We need an AtomicLong to be able to update the value referenced. Because we are already synchronizing with the
lock, we don't incur any additional performance hit here by the synchronization.
*/
private AtomicLong totalProgress;

ParallelProgressReporter(IProgressReceiver progressReceiver, Lock lock, AtomicLong totalProgress) {
super(progressReceiver);
this.transferLock = lock;
this.totalProgress = totalProgress;
}

@Override
public void reportProgress(long bytesTransferred) {
super.reportProgress(bytesTransferred);

/*
It is typically a bad idea to lock around customer code (which the progressReceiver is) because they could
never release the lock. However, we have decided that it is sufficiently difficult for them to make their
progressReporting code threadsafe that we will take that burden and the ensuing risks. Although it is the
case that only one thread is allowed to be in onNext at once, however there are multiple independent
requests happening at once to stage/download separate chunks, so we still need to lock either way.
*/
transferLock.lock();
this.progressReceiver.reportProgress(this.totalProgress.addAndGet(bytesTransferred));
transferLock.unlock();
}

/*
This is used in the case of retries to rewind the amount of progress reported so as not to over-report at the
end.
*/
@Override
public void rewindProgress() {
/*
Blocks do not interfere with each other's block progress and there is no way that, for a single block, one
thread will be trying to add to the progress while the other is trying to zero it. The updates are strictly
sequential. Avoiding using the lock is ideal.
*/
this.totalProgress.addAndGet(-1 * this.blockProgress);
super.rewindProgress();
}

}

/**
* Adds progress reporting functionality to the given {@code Flowable}. Each subscription (and therefore each
* retry) will rewind the progress reported so as not to over-report. The data reported will be the total amount
* of data emitted so far, or the "current position" of the Flowable.
*
* @param data
* The data whose transfer progress is to be tracked.
* @param progressReceiver
* {@link IProgressReceiver}
*
* @return A {@code Flowable} that emits the same data as the source but calls a callback to report the total amount
* of data emitted so far.
*
* @apiNote ## Sample Code \n
* [!code-java[Sample_Code](../azure-storage-java/src/test/java/com/microsoft/azure/storage/Samples.java?name=progress "Sample code for ProgressReporterFactor.addProgressReporting")] \n
* For more samples, please see the [Samples file](%https://github.com/Azure/azure-storage-java/blob/master/src/test/java/com/microsoft/azure/storage/Samples.java)
*/
public static Flowable<ByteBuffer> addProgressReporting(Flowable<ByteBuffer> data,
IProgressReceiver progressReceiver) {
if (progressReceiver == null) {
return data;
}
else {
ProgressReporterImpl tracker = new SequentialProgressReporter(progressReceiver);
return tracker.addProgressReporting(data);
}
}

static Flowable<ByteBuffer> addParallelProgressReporting(Flowable<ByteBuffer> data,
IProgressReceiver progressReceiver, Lock lock, AtomicLong totalProgress) {
if (progressReceiver == null) {
return data;
}
else {
ParallelProgressReporter tracker = new ParallelProgressReporter(progressReceiver, lock, totalProgress);
return tracker.addProgressReporting(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ stream, the buffers that were emitted will have already been consumed (their pos
return Single.error(e);
}
}
requestCopy.withContext(httpRequest.context());

// Deadline stuff

Expand Down Expand Up @@ -172,6 +173,7 @@ If attempt was against the secondary & it returned a StatusNotFound (404), then
} else {
action = "NoRetry: Successful HTTP request";
}

logf("Action=%s\n", action);
if (action.charAt(0) == 'R' && attempt < requestRetryOptions.maxTries()) {
/*
Expand Down Expand Up @@ -201,13 +203,14 @@ we do not consider the secondary at all (considerSecondary==false)). This will
"not being replayable. To support retries, all Flowables must produce the " +
"same data for each subscriber. Please ensure this behavior.", throwable));
}
String action;

/*
IOException is a catch-all for IO related errors. Technically it includes many types which may
not be network exceptions, but we should not hit those unless there is a bug in our logic. In
either case, it is better to optimistically retry instead of failing too soon.
A Timeout Exception is a client-side timeout coming from Rx.
*/
String action;
if (throwable instanceof IOException) {
action = "Retry: Network error";
} else if (throwable instanceof TimeoutException) {
Expand All @@ -216,6 +219,8 @@ we do not consider the secondary at all (considerSecondary==false)). This will
action = "NoRetry: Unknown error";
}



logf("Action=%s\n", action);
if (action.charAt(0) == 'R' && attempt < requestRetryOptions.maxTries()) {
/*
Expand All @@ -228,7 +233,6 @@ we do not consider the secondary at all (considerSecondary==false)). This will
primaryTry + 1 : primaryTry;
return attemptAsync(httpRequest, newPrimaryTry, considerSecondary,
attempt + 1);

}
return Single.error(throwable);
});
Expand Down
Loading

0 comments on commit 9a28f04

Please sign in to comment.