Skip to content

Commit

Permalink
Thread safety, native transports, and async file I/O (Azure#337)
Browse files Browse the repository at this point in the history
* Experimental EpollEventLoopGroup

* Add profiles to pom. Change HttpRequestBody to abstract class.

* Try using getConstructor().getInstance()

* Fix bug with mixed nio/native components

* Get inner exception message if needed

* Fix incorrect string for EpollEventLoopGroup

* Use native transports when Netty sizes manually specified

* Experiment with Rx test file creation

* Add prepareFiles step and properly parallelize

* More test file creation tweaks

* Simplify stress tests and improve file gen

* Fix bug in FileReadFlowable. Add writeContentToFile(Flowable).

* Fix early completion in writeContentToFile

* Write to file in download test. Fix thread safety in ResponseContentFlowable.

* Limit concurrency in download test

* Fix race in writeContentToFile

* Add download/upload streaming test

* Add x86_64 check to pom

* Thread safety improvements

* Change subscribeOn to observeOn and simplify test

* Require users to include native modules in runtime builds

* Make eventLoopGroup and channelPool final

* Fixes from feedback

* Use subscribeOn instead of observeOn for HTTP response content

* Fix typo

* Consolidate NettyAdapter constructors

* ThrottlingInterceptor tweaks
  • Loading branch information
RikkiGibson authored Jan 3, 2018
1 parent 91dc54e commit 56e004f
Show file tree
Hide file tree
Showing 7 changed files with 602 additions and 607 deletions.
46 changes: 46 additions & 0 deletions client-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -131,4 +139,42 @@

</plugins>
</build>

<profiles>
<profile>
<id>osx</id>
<activation>
<os>
<name>Mac OS X</name>
</os>
</activation>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>4.1.19.Final</version>
<classifier>${os.detected.name}-${os.detected.arch}</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</profile>

<profile>
<id>linux</id>
<activation>
<os>
<name>Linux</name>
</os>
</activation>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.19.Final</version>
<classifier>${os.detected.name}-${os.detected.arch}</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,19 @@
import io.reactivex.Emitter;
import io.reactivex.Flowable;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Function;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

/**
* Represents an asynchronous input stream with a content length.
Expand Down Expand Up @@ -71,25 +76,7 @@ public boolean isReplayable() {
* @return The AsyncInputStream.
*/
public static AsyncInputStream create(final AsynchronousFileChannel fileChannel, final long offset, final long length) {
int numChunks = (int) length / CHUNK_SIZE + (length % CHUNK_SIZE == 0 ? 0 : 1);
Flowable<byte[]> fileStream = Flowable.range(0, numChunks).concatMap(new Function<Integer, Flowable<byte[]>>() {
ByteBuffer innerBuf = ByteBuffer.wrap(new byte[CHUNK_SIZE]);

@Override
public Flowable<byte[]> apply(Integer chunkNo) throws Exception {
final long position = offset + (chunkNo * CHUNK_SIZE);
innerBuf.clear();
return Flowable.fromFuture(fileChannel.read(innerBuf, position))
.map(new Function<Integer, byte[]>() {
@Override
public byte[] apply(Integer bytesRead) throws Exception {
int bytesWanted = (int) Math.min(offset + length - position, bytesRead);
return Arrays.copyOf(innerBuf.array(), bytesWanted);
}
});
}
});

Flowable<byte[]> fileStream = new FileReadFlowable(fileChannel, offset, length);
return new AsyncInputStream(fileStream, length, true);
}

Expand All @@ -104,6 +91,82 @@ public static AsyncInputStream create(AsynchronousFileChannel fileChannel) throw
return create(fileChannel, 0, size);
}

private static class FileReadFlowable extends Flowable<byte[]> {
private final AsynchronousFileChannel fileChannel;
private final long offset;
private final long length;

FileReadFlowable(AsynchronousFileChannel fileChannel, long offset, long length) {
this.fileChannel = fileChannel;
this.offset = offset;
this.length = length;
}

@Override
protected void subscribeActual(Subscriber<? super byte[]> s) {
s.onSubscribe(new FileReadSubscription(s));
}

private class FileReadSubscription implements Subscription {
final Subscriber<? super byte[]> subscriber;
final ByteBuffer innerBuf = ByteBuffer.wrap(new byte[CHUNK_SIZE]);
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled = false;

// I/O callbacks are serialized, but not guaranteed to happen on the same thread, which makes volatile necessary.
volatile long position = offset;

FileReadSubscription(Subscriber<? super byte[]> subscriber) {
this.subscriber = subscriber;
}

@Override
public void request(long n) {
if (BackpressureHelper.add(requested, n) == 0L) {
doRead();
}
}

void doRead() {
innerBuf.clear();
fileChannel.read(innerBuf, position, null, onReadComplete);
}

private final CompletionHandler<Integer, Object> onReadComplete = new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer bytesRead, Object attachment) {
if (!cancelled) {
if (bytesRead == -1) {
subscriber.onComplete();
} else {
int bytesWanted = (int) Math.min(bytesRead, offset + length - position);
//noinspection NonAtomicOperationOnVolatileField
position += bytesWanted;
subscriber.onNext(Arrays.copyOf(innerBuf.array(), bytesWanted));
if (position >= offset + length) {
subscriber.onComplete();
} else if (requested.decrementAndGet() > 0) {
doRead();
}
}
}
}

@Override
public void failed(Throwable exc, Object attachment) {
if (!cancelled) {
subscriber.onError(exc);
}
}
};

@Override
public void cancel() {
cancelled = true;
}
}
}

/**
* Creates an AsyncInputStream which emits the content of a given InputStream with a known length.
*
Expand Down Expand Up @@ -136,7 +199,7 @@ public void accept(InputStream inputStream, Emitter<byte[]> emitter) throws Exce
emitter.onError(e);
}
}
});
}).observeOn(Schedulers.io());

return new AsyncInputStream(content, contentLength, false);
}
Expand All @@ -149,4 +212,4 @@ public void accept(InputStream inputStream, Emitter<byte[]> emitter) throws Exce
public static AsyncInputStream create(byte[] bytes) {
return new AsyncInputStream(Flowable.just(bytes), bytes.length, true);
}
}
}
Loading

0 comments on commit 56e004f

Please sign in to comment.