Skip to content

Commit

Permalink
Various minor improvements (Azure#396)
Browse files Browse the repository at this point in the history
* Remove Apache Commons dependency

* HttpClientFactory.close() instead of shutdown()

* Use per-instance lock in SharedChannelPool. Fixes Azure#388

* Fix condition wait in SharedChannelPool. Fixes Azure#390

* Disable test for close() with in-flight request

* Fix checkstyle
  • Loading branch information
RikkiGibson authored Mar 6, 2018
1 parent d9512b5 commit 9125441
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 115 deletions.
4 changes: 0 additions & 4 deletions client-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

package com.microsoft.rest.v2.http;

import io.reactivex.Completable;
import java.io.Closeable;

/**
* Creates an HttpClient from a Configuration.
*/
public interface HttpClientFactory {
public interface HttpClientFactory extends Closeable {
/**
* Creates an HttpClient with the given Configuration.
* @param configuration the configuration.
Expand All @@ -20,11 +20,8 @@ public interface HttpClientFactory {
HttpClient create(HttpClientConfiguration configuration);

/**
* Asynchronously awaits completion of in-flight requests,
* then closes shared resources associated with this HttpClient.Factory.
* After this Completable completes, HttpClients created from this Factory can no longer be used.
*
* @return a Completable which shuts down the factory when subscribed to.
* Awaits completion of in-flight requests, then closes shared resources associated with this HttpClient.Factory.
* After this method returns, HttpClients created from this Factory can no longer be used.
*/
Completable shutdown();
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.Flowable;
import io.reactivex.FlowableSubscriber;
import io.reactivex.Scheduler;
Expand All @@ -54,7 +52,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -590,13 +587,8 @@ public HttpClient create(final HttpClientConfiguration configuration) {
}

@Override
public Completable shutdown() {
return Completable.defer(new Callable<CompletableSource>() {
@Override
public CompletableSource call() throws Exception {
return Completable.fromFuture(adapter.shutdownGracefully());
}
});
public void close() {
adapter.shutdownGracefully().awaitUninterruptibly();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import io.reactivex.exceptions.Exceptions;

import javax.net.ssl.SSLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;

/**
* A Netty channel pool implementation shared between multiple requests.
Expand All @@ -44,7 +45,7 @@ public class SharedChannelPool implements ChannelPool {
private final Queue<ChannelRequest> requests;
private final ConcurrentMultiHashMap<URI, Channel> available;
private final ConcurrentMultiHashMap<URI, Channel> leased;
private final Object sync = -1;
private final Object sync = new Object();
private final SslContext sslContext;
private final ExecutorService executor;
private volatile boolean closed = false;
Expand Down Expand Up @@ -80,82 +81,77 @@ protected void initChannel(Channel ch) throws Exception {
} catch (SSLException e) {
throw new RuntimeException(e);
}
this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "SharedChannelPool-worker");
thread.setDaemon(true);
return thread;
}
this.executor = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable, "SharedChannelPool-worker");
thread.setDaemon(true);
return thread;
});

executor.submit(new Runnable() {
@Override
public void run() {
while (!closed) {
try {
final ChannelRequest request;
final ChannelFuture channelFuture;
synchronized (requests) {
while (requests.isEmpty() && !closed) {
requests.wait();
}
executor.submit(() -> {
while (!closed) {
try {
final ChannelRequest request;
final ChannelFuture channelFuture;
// Synchronizing just to be notified when requests is non-empty
synchronized (requests) {
while (requests.isEmpty() && !closed) {
requests.wait();
}
request = requests.poll();
}
request = requests.poll();

if (leased.size() >= poolSize && !closed) {
synchronized (sync) {
sync.wait();
}
synchronized (sync) {
while (leased.size() >= poolSize && !closed) {
sync.wait();
}

if (closed) {
break;
}
synchronized (sync) {
if (available.containsKey(request.uri)) {
Channel channel = available.poll(request.uri);
if (isChannelHealthy(channel)) {
handler.channelAcquired(channel);
request.promise.setSuccess(channel);
leased.put(request.uri, channel);
continue;
}
}
// Create a new channel - remove an available one if size overflows
if (available.size() > 0 && available.size() + leased.size() >= poolSize) {
available.poll().close();
}
int port;
if (request.uri.getPort() < 0) {
port = "https".equals(request.uri.getScheme()) ? 443 : 80;
} else {
port = request.uri.getPort();

if (available.containsKey(request.uri)) {
Channel channel = available.poll(request.uri);
if (isChannelHealthy(channel)) {
handler.channelAcquired(channel);
request.promise.setSuccess(channel);
leased.put(request.uri, channel);
continue;
}
channelFuture = SharedChannelPool.this.bootstrap.clone().connect(request.uri.getHost(), port);
}
// Create a new channel - remove an available one if size overflows
if (available.size() > 0 && available.size() + leased.size() >= poolSize) {
available.poll().close();
}
int port;
if (request.uri.getPort() < 0) {
port = "https".equals(request.uri.getScheme()) ? 443 : 80;
} else {
port = request.uri.getPort();
}
channelFuture = SharedChannelPool.this.bootstrap.clone().connect(request.uri.getHost(), port);

channelFuture.channel().attr(CHANNEL_URI).set(request.uri);
channelFuture.channel().attr(CHANNEL_URI).set(request.uri);

// Apply SSL handler for https connections
if ("https".equalsIgnoreCase(request.uri.getScheme())) {
channelFuture.channel().pipeline().addFirst(sslContext.newHandler(channelFuture.channel().alloc(), request.uri.getHost(), port));
}
// Apply SSL handler for https connections
if ("https".equalsIgnoreCase(request.uri.getScheme())) {
channelFuture.channel().pipeline().addFirst(sslContext.newHandler(channelFuture.channel().alloc(), request.uri.getHost(), port));
}

channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (channelFuture.isSuccess()) {
handler.channelAcquired(channelFuture.channel());
leased.put(request.uri, channelFuture.channel());
request.promise.setSuccess(channelFuture.channel());
} else {
request.promise.setFailure(channelFuture.cause());
}
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (channelFuture.isSuccess()) {
handler.channelAcquired(channelFuture.channel());
leased.put(request.uri, channelFuture.channel());
request.promise.setSuccess(channelFuture.channel());
} else {
request.promise.setFailure(channelFuture.cause());
}
});
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}
});
Expand Down Expand Up @@ -217,17 +213,12 @@ public Future<Channel> acquire(Promise<Channel> promise) {
* @return a Future representing the operation.
*/
public Future<Void> closeAndRelease(final Channel channel) {
return channel.close().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
release(channel);
}
});
return channel.close().addListener(future -> release(channel));
}

@Override
public Future<Void> release(final Channel channel) {
return this.release(channel, this.bootstrap.config().group().next().<Void>newPromise());
return this.release(channel, this.bootstrap.config().group().next().newPromise());
}

@Override
Expand All @@ -251,6 +242,11 @@ public Future<Void> release(final Channel channel, final Promise<Void> promise)
public void close() {
closed = true;
executor.shutdownNow();
synchronized (requests) {
while (!requests.isEmpty()) {
requests.remove().promise.setFailure(new CancellationException("Channel pool was closed"));
}
}
}

private static class ChannelRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.microsoft.rest.v2.http.HttpResponse;
import io.reactivex.Single;
import io.reactivex.functions.Function;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.net.CookieHandler;
Expand Down Expand Up @@ -53,7 +52,7 @@ public Single<HttpResponse> sendAsync(HttpRequest request) {

Map<String, List<String>> requestCookies = cookies.get(uri, cookieHeaders);
for (Map.Entry<String, List<String>> entry : requestCookies.entrySet()) {
request.headers().set(entry.getKey(), StringUtils.join(entry.getValue(), ","));
request.headers().set(entry.getKey(), String.join(",", entry.getValue()));
}

return next.sendAsync(request).map(new Function<HttpResponse, HttpResponse>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ public void cancellationTest() throws Exception {
final Disposable d = Flowable.range(0, NUM_FILES)
.flatMap(integer ->
service.download100M(String.valueOf(integer), sas)
.flatMapPublisher(RestResponse::body))
.flatMapPublisher(StreamResponse::body))
.subscribe();

Completable.complete().delay(10, TimeUnit.SECONDS)
Expand Down Expand Up @@ -438,15 +438,7 @@ public void testHighParallelism() throws Exception {
return Completable.error(throwable);
}
})
.andThen(service.deleteContainer(integer.toString(), sas).toCompletable()
.onErrorResumeNext(throwable -> {
if (throwable instanceof RestException && ((RestException) throwable).response().statusCode() == 404) {
LoggerFactory.getLogger(getClass()).info("What?");
return Completable.complete();
} else {
return Completable.error(throwable);
}
})))
.andThen(service.deleteContainer(integer.toString(), sas).toCompletable()))
.blockingAwait();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void testRequestBeforeShutdownSucceeds() throws Exception {
HttpRequest request = new HttpRequest("", HttpMethod.GET, new URL("https://httpbin.org/get"), null);

client.sendRequestAsync(request).blockingGet();
factory.shutdown().blockingAwait();
factory.close();
}

@Test
Expand All @@ -30,7 +30,7 @@ public void testRequestAfterShutdownIsRejected() throws Exception {
HttpRequest request = new HttpRequest("", HttpMethod.GET, new URL("https://httpbin.org/get"), null);

LoggerFactory.getLogger(getClass()).info("Closing factory");
factory.shutdown().blockingAwait();
factory.close();

try {
LoggerFactory.getLogger(getClass()).info("Sending request");
Expand All @@ -42,7 +42,7 @@ public void testRequestAfterShutdownIsRejected() throws Exception {
}

@Test
@Ignore("Passes inconsistently due to race condition")
@Ignore("Fails intermittently due to race condition")
public void testInFlightRequestSucceedsAfterCancellation() throws Exception {
// Retry a few times in case shutdown begins before the request is submitted to Netty
for (int i = 0; i < 3; i++) {
Expand All @@ -52,7 +52,7 @@ public void testInFlightRequestSucceedsAfterCancellation() throws Exception {

Future<HttpResponse> asyncResponse = client.sendRequestAsync(request).toFuture();
Thread.sleep(100);
factory.shutdown().blockingAwait();
factory.close();

boolean shouldRetry = false;
try {
Expand All @@ -68,7 +68,7 @@ public void testInFlightRequestSucceedsAfterCancellation() throws Exception {
if (i == 2) {
fail();
} else {
LoggerFactory.getLogger(getClass()).info("Shutdown started before sending request. Retry attempt " + i+1);
LoggerFactory.getLogger(getClass()).info("Shutdown started before sending request. Retry attempt " + (i+1));
}
}
}
Expand Down
5 changes: 0 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.8.11</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>adal4j</artifactId>
Expand Down

0 comments on commit 9125441

Please sign in to comment.