Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions sdk/core/azure-core-http-netty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,28 @@
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>java-lts</id>
<activation>
<jdk>[11,)</jdk>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-surefire-plugin;external_dependency} -->
<configuration>
<argLine>
--add-opens com.azure.http.netty/com.azure.core.http.netty=ALL-UNNAMED
--add-opens com.azure.http.netty/com.azure.core.http.netty.implementation=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,24 @@
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.implementation.HttpProxyExceptionHandler;
import com.azure.core.util.CoreUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.proxy.ProxyHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.ByteBufFlux;
import reactor.netty.Connection;
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.tcp.TcpClient;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.regex.Pattern;

/**
* This class provides a Netty-based implementation for the {@link HttpClient} interface. Creating an instance of this
Expand All @@ -46,11 +39,6 @@
* @see NettyAsyncHttpClientBuilder
*/
class NettyAsyncHttpClient implements HttpClient {
private static final Pattern CHARSET_PATTERN = Pattern.compile("charset=(\\S+)\\b", Pattern.CASE_INSENSITIVE);

private final EventLoopGroup eventLoopGroup;
private final Supplier<ProxyHandler> proxyHandlerSupplier;
private final Pattern nonProxyHostsPattern;
private final boolean disableBufferCopy;

final reactor.netty.http.client.HttpClient nettyClient;
Expand All @@ -59,25 +47,17 @@ class NettyAsyncHttpClient implements HttpClient {
* Creates default NettyAsyncHttpClient.
*/
NettyAsyncHttpClient() {
this(reactor.netty.http.client.HttpClient.create(), null, null, null, false);
this(reactor.netty.http.client.HttpClient.create(), false);
}

/**
* Creates NettyAsyncHttpClient with provided http client.
*
* @param nettyClient the reactor-netty http client
* @param eventLoopGroup {@link EventLoopGroup} that processes requests.
* @param proxyHandlerSupplier Supplier that returns the {@link ProxyHandler} that connects to the configured
* proxy.
* @param disableBufferCopy Determines whether deep cloning of response buffers should be disabled.
*/
NettyAsyncHttpClient(reactor.netty.http.client.HttpClient nettyClient, EventLoopGroup eventLoopGroup,
Supplier<ProxyHandler> proxyHandlerSupplier, String nonProxyHosts, boolean disableBufferCopy) {
NettyAsyncHttpClient(reactor.netty.http.client.HttpClient nettyClient, boolean disableBufferCopy) {
this.nettyClient = nettyClient;
this.eventLoopGroup = eventLoopGroup;
this.proxyHandlerSupplier = proxyHandlerSupplier;
this.nonProxyHostsPattern = (nonProxyHosts == null)
? null
: Pattern.compile(nonProxyHosts, Pattern.CASE_INSENSITIVE);
this.disableBufferCopy = disableBufferCopy;
}

Expand All @@ -90,41 +70,13 @@ public Mono<HttpResponse> send(final HttpRequest request) {
Objects.requireNonNull(request.getUrl(), "'request.getUrl()' cannot be null.");
Objects.requireNonNull(request.getUrl().getProtocol(), "'request.getUrl().getProtocol()' cannot be null.");
return nettyClient
.tcpConfiguration(tcpClient -> configureTcpClient(tcpClient, request.getUrl().getHost()))
.request(HttpMethod.valueOf(request.getHttpMethod().toString()))
.uri(request.getUrl().toString())
.send(bodySendDelegate(request))
.responseConnection(responseDelegate(request, disableBufferCopy))
.single();
}

/*
* Configures the underlying TcpClient that sends the request.
*/
private TcpClient configureTcpClient(TcpClient tcpClient, String host) {
if (eventLoopGroup != null) {
tcpClient = tcpClient.runOn(eventLoopGroup);
}

// Validate that the request should be proxied.
if (nonProxyHostsPattern == null || !nonProxyHostsPattern.matcher(host).matches()) {
ProxyHandler proxyHandler = (proxyHandlerSupplier == null) ? null : proxyHandlerSupplier.get();
if (proxyHandler != null) {
/*
* Configure the request Channel to be initialized with a ProxyHandler. The ProxyHandler is the first
* operation in the pipeline as it needs to handle sending a CONNECT request to the proxy before any
* request data is sent.
*/
tcpClient = tcpClient.bootstrap(bootstrap -> BootstrapHandlers
.updateConfiguration(bootstrap, NettyPipeline.ProxyHandler, (connectionObserver, channel) ->
channel.pipeline().addFirst(NettyPipeline.ProxyHandler, proxyHandler)
.addLast("azure.proxy.exceptionHandler", new HttpProxyExceptionHandler())));
}
}

return tcpClient;
}

/**
* Delegate to send the request content.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@

import com.azure.core.http.ProxyOptions;
import com.azure.core.http.netty.implementation.ChallengeHolder;
import com.azure.core.http.netty.implementation.HttpProxyHandler;
import com.azure.core.http.netty.implementation.DeferredHttpProxyProvider;
import com.azure.core.util.AuthorizationChallengeHandler;
import com.azure.core.util.Configuration;
import com.azure.core.util.logging.ClientLogger;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import reactor.netty.NettyPipeline;
import reactor.netty.channel.BootstrapHandlers;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.tcp.ProxyProvider;

import java.nio.ByteBuffer;
import java.util.Objects;
Expand All @@ -32,8 +32,6 @@
* @see HttpClient
*/
public class NettyAsyncHttpClientBuilder {
private static final String INVALID_PROXY_MESSAGE = "Unknown Proxy type '%s' in use. Not configuring Netty proxy.";

private final ClientLogger logger = new ClientLogger(NettyAsyncHttpClientBuilder.class);

private final HttpClient baseHttpClient;
Expand Down Expand Up @@ -94,14 +92,47 @@ public com.azure.core.http.HttpClient build() {
? ProxyOptions.fromConfiguration(buildConfiguration)
: proxyOptions;

String nonProxyHosts = (buildProxyOptions == null) ? null : buildProxyOptions.getNonProxyHosts();
AuthorizationChallengeHandler handler = (buildProxyOptions == null || buildProxyOptions.getUsername() == null)
? null
: new AuthorizationChallengeHandler(buildProxyOptions.getUsername(), buildProxyOptions.getPassword());
AtomicReference<ChallengeHolder> proxyChallengeHolder = new AtomicReference<>();
/*
* Only configure the custom authorization challenge handler and challenge holder when using an authenticated
* HTTP proxy. All other proxying such as SOCKS4, SOCKS5, and anonymous HTTP will use Netty's built-in handlers.
*/
boolean useCustomProxyHandler = shouldUseCustomProxyHandler(buildProxyOptions);
AuthorizationChallengeHandler handler = useCustomProxyHandler
? new AuthorizationChallengeHandler(buildProxyOptions.getUsername(), buildProxyOptions.getPassword())
: null;
AtomicReference<ChallengeHolder> proxyChallengeHolder = useCustomProxyHandler ? new AtomicReference<>() : null;

nettyHttpClient = nettyHttpClient.tcpConfiguration(tcpClient -> {
if (eventLoopGroup != null) {
tcpClient = tcpClient.runOn(eventLoopGroup);
}

// Proxy configurations are present, setup a proxy in Netty.
if (buildProxyOptions != null) {
// Determine if custom handling will be used, otherwise use Netty's built-in handlers.
if (handler != null) {
/*
* Configure the request Channel to be initialized with a ProxyHandler. The ProxyHandler is the
* first operation in the pipeline as it needs to handle sending a CONNECT request to the proxy
* before any request data is sent.
*/
tcpClient = tcpClient.bootstrap(bootstrap -> BootstrapHandlers.updateConfiguration(bootstrap,
NettyPipeline.ProxyHandler, new DeferredHttpProxyProvider(handler, proxyChallengeHolder,
buildProxyOptions)));
} else {
tcpClient = tcpClient.proxy(proxy ->
proxy.type(toReactorNettyProxyType(buildProxyOptions.getType(), logger))
.address(buildProxyOptions.getAddress())
.username(buildProxyOptions.getUsername())
.password(ignored -> buildProxyOptions.getPassword())
.nonProxyHosts(buildProxyOptions.getNonProxyHosts()));
}
}

return new NettyAsyncHttpClient(nettyHttpClient, eventLoopGroup,
() -> getProxyHandler(buildProxyOptions, handler, proxyChallengeHolder), nonProxyHosts, disableBufferCopy);
return tcpClient;
});

return new NettyAsyncHttpClient(nettyHttpClient, disableBufferCopy);
}

/**
Expand Down Expand Up @@ -153,9 +184,9 @@ public NettyAsyncHttpClientBuilder port(int port) {
/**
* Sets the NIO event loop group that will be used to run IO loops.
*
* @deprecated deprecated in favor of {@link #eventLoopGroup(EventLoopGroup)}.
* @param nioEventLoopGroup The {@link NioEventLoopGroup} that will run IO loops.
* @return the updated NettyAsyncHttpClientBuilder object.
* @deprecated deprecated in favor of {@link #eventLoopGroup(EventLoopGroup)}.
*/
@Deprecated
public NettyAsyncHttpClientBuilder nioEventLoopGroup(NioEventLoopGroup nioEventLoopGroup) {
Expand Down Expand Up @@ -193,49 +224,43 @@ public NettyAsyncHttpClientBuilder configuration(Configuration configuration) {
}

/**
* Disables deep copy of response {@link ByteBuffer} into a heap location that is managed by this client as
* opposed to the underlying netty library which may use direct buffer pool.
* Disables deep copy of response {@link ByteBuffer} into a heap location that is managed by this client as opposed
* to the underlying netty library which may use direct buffer pool.
* <br>
* <b>
* Caution: Disabling this is not recommended as it can lead to data corruption if the downstream consumers
* of the response do not handle the byte buffers before netty releases them.
* Caution: Disabling this is not recommended as it can lead to data corruption if the downstream consumers of the
* response do not handle the byte buffers before netty releases them.
* </b>
* If copy is disabled, underlying Netty layer can potentially reclaim byte array backed by the {@code ByteBuffer}
* upon the return of {@code onNext()}. So, users should ensure they process the {@link ByteBuffer} immediately
* and then return.
* upon the return of {@code onNext()}. So, users should ensure they process the {@link ByteBuffer} immediately and
* then return.
*
* {@codesnippet com.azure.core.http.netty.disabled-buffer-copy}
* {@codesnippet com.azure.core.http.netty.disabled-buffer-copy}
*
* @param disableBufferCopy If set to {@code true}, the client built from this builder will not deep-copy
* response {@link ByteBuffer ByteBuffers}.
* @param disableBufferCopy If set to {@code true}, the client built from this builder will not deep-copy response
* {@link ByteBuffer ByteBuffers}.
* @return The updated {@link NettyAsyncHttpClientBuilder} object.
*/
public NettyAsyncHttpClientBuilder disableBufferCopy(boolean disableBufferCopy) {
this.disableBufferCopy = disableBufferCopy;
return this;
}

/*
* Creates a proxy handler based on the passed ProxyOptions.
*/
private ProxyHandler getProxyHandler(ProxyOptions proxyOptions, AuthorizationChallengeHandler challengeHandler,
AtomicReference<ChallengeHolder> proxyChallengeHolder) {
if (proxyOptions == null) {
return null;
}
private static boolean shouldUseCustomProxyHandler(ProxyOptions options) {
return options != null && options.getUsername() != null && options.getType() == ProxyOptions.Type.HTTP;
}

switch (proxyOptions.getType()) {
private static ProxyProvider.Proxy toReactorNettyProxyType(ProxyOptions.Type azureProxyType, ClientLogger logger) {
switch (azureProxyType) {
case HTTP:
return new HttpProxyHandler(proxyOptions.getAddress(), challengeHandler,
proxyChallengeHolder);
return ProxyProvider.Proxy.HTTP;
case SOCKS4:
return new Socks4ProxyHandler(proxyOptions.getAddress(), proxyOptions.getUsername());
return ProxyProvider.Proxy.SOCKS4;
case SOCKS5:
return new Socks5ProxyHandler(proxyOptions.getAddress(), proxyOptions.getUsername(),
proxyOptions.getPassword());
return ProxyProvider.Proxy.SOCKS5;
default:
throw logger.logExceptionAsError(new IllegalStateException(
String.format(INVALID_PROXY_MESSAGE, proxyOptions.getType())));
throw logger.logExceptionAsError(
new IllegalArgumentException("Unknown 'ProxyOptions.Type' enum value"));
}
}
}
Loading