Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e3a89a3
Add WebSocketsProxyConnectionHandler.
conniey Sep 30, 2019
c79d40e
Update ReactorHandlerProvider to connect proxy.
conniey Sep 30, 2019
465fab5
Add Objects.requiresNotNull check.
conniey Oct 1, 2019
702e33b
Integration web proxy handler in ReactorConnection.
conniey Oct 1, 2019
f919758
Fix API for MockReactorHandlerProvider.
conniey Oct 1, 2019
e5e4a5f
Adding tests for WebSocketsProxyConnectionHandler.
conniey Oct 1, 2019
2389f8a
Use logger.throw
conniey Oct 1, 2019
1e474f4
Adding tests for ReactorHandlerProvider.
conniey Oct 1, 2019
0a9f868
Adding documentation to WebSocketsProxyConnectionHandler. Using Syste…
conniey Oct 1, 2019
cc6bbc0
Asserting UserAgent string is in connection properties.
conniey Oct 1, 2019
f6aee42
Adding simple proxy for integration tests.
conniey Oct 1, 2019
d245b86
Update IntegrationTestBase to getProxyConfiguration from configuration.
conniey Oct 1, 2019
b776efb
Add ProxyIntegrationTest.
conniey Oct 1, 2019
41b8c1d
Add ProxyReceiveTest.
conniey Oct 1, 2019
b26dc64
Handling PROTON_IO errors.
conniey Oct 1, 2019
972ef14
Add integration tests for proxy configuration.
conniey Oct 2, 2019
5bdfef8
Cleaning up simple proxy.
conniey Oct 2, 2019
170ffc1
Fix checkstyle issues
srnagar Oct 4, 2019
f3862bd
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
conniey Oct 4, 2019
abc5ce9
Fix import types in proxy connection handler.
conniey Oct 4, 2019
6e43678
Ignore tests until simple proxy is fixed.
conniey Oct 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ public enum ErrorCondition {
/**
* Tracking Id for an exception.
*/
TRACKING_ID_PROPERTY("com.microsoft:tracking-id");
TRACKING_ID_PROPERTY("com.microsoft:tracking-id"),
Comment thread
srnagar marked this conversation as resolved.
/**
* IO exceptions that occur in proton-j library.
*/
PROTON_IO("proton:io");

private static final Map<String, ErrorCondition> ERROR_CONSTANT_MAP = new HashMap<>();
private final String errorCondition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public static Exception toException(String errorCondition, String description, E
}

final ErrorCondition condition = ErrorCondition.fromString(errorCondition);

if (condition == null) {
throw new IllegalArgumentException(String.format(Locale.ROOT, "'%s' is not a known ErrorCondition.",
errorCondition));
Expand All @@ -45,6 +44,7 @@ public static Exception toException(String errorCondition, String description, E
case INTERNAL_ERROR:
case LINK_DETACH_FORCED:
case CONNECTION_FORCED:
case PROTON_IO:
isTransient = true;
break;
case ENTITY_DISABLED_ERROR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
"'tokenManagerProvider' cannot be null.");
this.messageSerializer = messageSerializer;
this.handler = handlerProvider.createConnectionHandler(connectionId, connectionOptions.getHostname(),
connectionOptions.getTransportType());
connectionOptions.getTransportType(), connectionOptions.getProxyConfiguration());
this.retryPolicy = RetryUtil.getRetryPolicy(connectionOptions.getRetry());

this.connectionMono = Mono.fromCallable(this::getOrCreateConnection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.amqp.implementation.handler.WebSocketsConnectionHandler;
import com.azure.core.amqp.implementation.handler.WebSocketsProxyConnectionHandler;
import com.azure.core.amqp.models.ProxyConfiguration;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.reactor.Reactor;

Expand Down Expand Up @@ -40,13 +42,20 @@ public ReactorHandlerProvider(ReactorProvider provider) {
* @param transportType Transport type used for the connection.
* @return A new {@link ConnectionHandler}.
*/
public ConnectionHandler createConnectionHandler(String connectionId, String hostname,
TransportType transportType) {
public ConnectionHandler createConnectionHandler(String connectionId, String hostname, TransportType transportType,
ProxyConfiguration proxyConfiguration) {
switch (transportType) {
case AMQP:
return new ConnectionHandler(connectionId, hostname);
case AMQP_WEB_SOCKETS:
return new WebSocketsConnectionHandler(connectionId, hostname);
if (proxyConfiguration != null && proxyConfiguration.isProxyAddressConfigured()) {
return new WebSocketsProxyConnectionHandler(connectionId, hostname, proxyConfiguration);
} else if (WebSocketsProxyConnectionHandler.shouldUseProxy(hostname)) {
logger.info("System default proxy configured for hostname '{}'. Using proxy.", hostname);
return new WebSocketsProxyConnectionHandler(connectionId, hostname, ProxyConfiguration.SYSTEM_DEFAULTS);
} else {
return new WebSocketsConnectionHandler(connectionId, hostname);
}
default:
throw logger.logExceptionAsWarning(new IllegalArgumentException(String.format(Locale.US,
"This transport type '%s' is not supported.", transportType)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ public class ConnectionHandler extends Handler {
* Creates a handler that handles proton-j's connection events.
*
* @param connectionId Identifier for this connection.
* @param hostname Hostname to use for socket creation. If there is a proxy configured, this could be a proxy's
* IP address.
* @param hostname Hostname of the AMQP message broker to create a connection to.
*/
public ConnectionHandler(final String connectionId, final String hostname) {
this(connectionId, hostname, new ClientLogger(ConnectionHandler.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public class WebSocketsConnectionHandler extends ConnectionHandler {
* Creates a handler that handles proton-j's connection events using web sockets.
*
* @param connectionId Identifier for this connection.
* @param hostname Hostname to use for socket creation. If there is a proxy configured, this could be a
* proxy's IP address.
* @param hostname Hostname to use for socket creation.
*/
public WebSocketsConnectionHandler(final String connectionId, final String hostname) {
super(connectionId, hostname, new ClientLogger(WebSocketsConnectionHandler.class));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.implementation.AmqpErrorCode;
import com.azure.core.amqp.models.ProxyConfiguration;
import com.azure.core.implementation.util.ImplUtils;
import com.azure.core.util.logging.ClientLogger;
import com.microsoft.azure.proton.transport.proxy.ProxyAuthenticationType;
import com.microsoft.azure.proton.transport.proxy.ProxyHandler;
import com.microsoft.azure.proton.transport.proxy.impl.ProxyHandlerImpl;
import com.microsoft.azure.proton.transport.proxy.impl.ProxyImpl;
import org.apache.qpid.proton.amqp.transport.ConnectionError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.TransportInternal;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.URI;
import java.util.List;
import java.util.Locale;
import java.util.Objects;

/**
* Creates an AMQP connection using web sockets (port 443) and connects through a proxy.
*/
public class WebSocketsProxyConnectionHandler extends WebSocketsConnectionHandler {
private static final String HTTPS_URI_FORMAT = "https://%s:%s";
private static final String PROXY_SELECTOR_HAS_BEEN_MODIFIED = "ProxySelector has been modified.";

private final ClientLogger logger = new ClientLogger(WebSocketsProxyConnectionHandler.class);
private final String amqpHostname;
private final ProxyConfiguration proxyConfiguration;

/**
* Creates a handler that handles proton-j's connection through a proxy using web sockets.
*
* @param connectionId Identifier for this connection.
* @param amqpHostname Hostname of the AMQP message broker. The hostname of the proxy is exposed in {@link
* #getHostname()}.
* @throws NullPointerException if {@code amqpHostname} or {@code proxyConfiguration} is null.
*/
public WebSocketsProxyConnectionHandler(String connectionId, String amqpHostname,
ProxyConfiguration proxyConfiguration) {
super(connectionId, amqpHostname);
this.amqpHostname = Objects.requireNonNull(amqpHostname, "'amqpHostname' cannot be null.");
this.proxyConfiguration = Objects.requireNonNull(proxyConfiguration, "'proxyConfiguration' is required.");
Comment thread
conniey marked this conversation as resolved.
Outdated
}

/**
* Looks through system defined proxies to see if one should be used for connecting to the message broker.
*
* @param hostname Hostname for the AMQP message broker.
*
* @return {@code true} if a proxy should be used to connect to the AMQP message broker and null otherwise.
*/
public static boolean shouldUseProxy(final String hostname) {
Objects.requireNonNull(hostname, "'hostname' cannot be null.");

final URI uri = createURI(hostname, HTTPS_PORT);
final ProxySelector proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
return false;
}

final List<Proxy> proxies = proxySelector.select(uri);
return isProxyAddressLegal(proxies);
}

@Override
public String getHostname() {
final InetSocketAddress socketAddress = getProxyAddress();
return socketAddress.getHostString();
}

@Override
public int getProtocolPort() {
final InetSocketAddress socketAddress = getProxyAddress();
return socketAddress.getPort();
}

@Override
public void onTransportError(Event event) {
super.onTransportError(event);

final Transport transport = event.getTransport();
final Connection connection = event.getConnection();
if (connection == null || transport == null) {
return;
}

final ErrorCondition errorCondition = transport.getCondition();
if (errorCondition == null || !(errorCondition.getCondition().equals(ConnectionError.FRAMING_ERROR)
|| errorCondition.getCondition().equals(AmqpErrorCode.PROTON_IO_ERROR))) {
return;
}

final String hostName = event.getReactor().getConnectionAddress(connection);
final ProxySelector proxySelector = ProxySelector.getDefault();
final boolean isProxyConfigured = proxySelector != null
|| (proxyConfiguration != null && proxyConfiguration.isProxyAddressConfigured());

if (!isProxyConfigured || ImplUtils.isNullOrEmpty(hostName)) {
return;
}

final String[] hostNameParts = hostName.split(":");
if (hostNameParts.length != 2) {
return;
Comment thread
conniey marked this conversation as resolved.
}

int port;
try {
port = Integer.parseInt(hostNameParts[1]);
} catch (NumberFormatException ignore) {
return;
Comment thread
conniey marked this conversation as resolved.
}

// since proton library communicates all errors based on amqp-error-condition
// it swallows the IOException and translates it to proton-io errorCode
// we reconstruct the IOException in this case - but, callstack is lost
final IOException ioException = new IOException(errorCondition.getDescription());
final URI url = createURI(amqpHostname, getProtocolPort());
final InetSocketAddress address = new InetSocketAddress(hostNameParts[0], port);

logger.error(String.format("Failed to connect to url: '%s', proxy host: '%s'",
url.toString(), address.getHostString()), ioException);

if (proxySelector != null) {
proxySelector.connectFailed(url, address, ioException);
}
}

@Override
protected void addTransportLayers(final Event event, final TransportInternal transport) {
super.addTransportLayers(event, transport);

// Checking that the proxy configuration is not null and not equal to the system defaults option.
final ProxyImpl proxy = proxyConfiguration != null && !(proxyConfiguration == ProxyConfiguration.SYSTEM_DEFAULTS)
? new ProxyImpl(getProtonConfiguration())
: new ProxyImpl();

// host name used to create proxy connect request
// after creating the socket to proxy
final String hostname = event.getConnection().getHostname();
final ProxyHandler proxyHandler = new ProxyHandlerImpl();
proxy.configure(hostname, null, proxyHandler, transport);

transport.addTransportLayer(proxy);

logger.info("addProxyHandshake: hostname[{}]", hostname);
}

private InetSocketAddress getProxyAddress() {
if (proxyConfiguration != null && proxyConfiguration.isProxyAddressConfigured()) {
return (InetSocketAddress) proxyConfiguration.getProxyAddress().address();
}

final URI serviceUri = createURI(amqpHostname, HTTPS_PORT);
final ProxySelector proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
throw logger.logExceptionAsError(new IllegalStateException(PROXY_SELECTOR_HAS_BEEN_MODIFIED));
}

final List<Proxy> proxies = proxySelector.select(serviceUri);
if (!isProxyAddressLegal(proxies)) {
throw logger.logExceptionAsError(new IllegalStateException(PROXY_SELECTOR_HAS_BEEN_MODIFIED));
}

final Proxy proxy = proxies.get(0);
return (InetSocketAddress) proxy.address();
}

private com.microsoft.azure.proton.transport.proxy.ProxyConfiguration getProtonConfiguration() {
final ProxyAuthenticationType type = getProtonAuthType(proxyConfiguration.getAuthentication());
final String username = proxyConfiguration.hasUserDefinedCredentials()
? proxyConfiguration.getCredential().getUserName()
: null;
final String password = proxyConfiguration.hasUserDefinedCredentials()
? new String(proxyConfiguration.getCredential().getPassword())
: null;

return new com.microsoft.azure.proton.transport.proxy.ProxyConfiguration(type,
proxyConfiguration.getProxyAddress(), username, password);
}

private ProxyAuthenticationType getProtonAuthType(com.azure.core.amqp.models.ProxyAuthenticationType type) {
Comment thread
conniey marked this conversation as resolved.
Outdated
switch (type) {
case DIGEST:
return ProxyAuthenticationType.DIGEST;
case BASIC:
return ProxyAuthenticationType.BASIC;
case NONE:
return ProxyAuthenticationType.NONE;
default:
throw logger.logExceptionAsError(new IllegalArgumentException(
"This authentication type is unknown:" + type.name()));
}
}

private static URI createURI(final String hostname, final int port) {
return URI.create(String.format(Locale.ROOT, HTTPS_URI_FORMAT, hostname, port));
}

/**
* This always selects the first proxy in the list instead of going through all the available ones.
*
* @param proxies List of proxies available.
*
* @return {@code true} if the first proxy in the list is an HTTP proxy and is an IP address.
*/
private static boolean isProxyAddressLegal(final List<Proxy> proxies) {
// we look only at the first proxy in the list
// if the proxy can be translated to InetSocketAddress
// only then - can we parse it to hostName and Port
// which is required by qpid-proton-j library reactor.connectToHost() API
return proxies != null
Comment thread
conniey marked this conversation as resolved.
Outdated
&& !proxies.isEmpty()
&& proxies.get(0).type() == Proxy.Type.HTTP
&& proxies.get(0).address() != null
&& proxies.get(0).address() instanceof InetSocketAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.amqp.models.ProxyConfiguration;

import java.time.Duration;

Expand All @@ -32,7 +33,8 @@ public SessionHandler createSessionHandler(String connectionId, String hostname,
}

@Override
public ConnectionHandler createConnectionHandler(String connectionId, String hostname, TransportType transportType) {
public ConnectionHandler createConnectionHandler(String connectionId, String hostname, TransportType transportType,
ProxyConfiguration configuration) {
return connectionHandler;
}

Expand Down
Loading