Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.Response;
import org.asynchttpclient.proxy.ProxyServer;
import org.asynchttpclient.util.HttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,10 +39,7 @@
import javax.xml.parsers.ParserConfigurationException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.*;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
Expand All @@ -68,8 +66,10 @@ public class ManagementClientAsync {
private static final String USER_AGENT = String.format("%s/%s(%s)", ClientConstants.PRODUCT_NAME, ClientConstants.CURRENT_JAVACLIENT_VERSION, ClientConstants.PLATFORM_INFO);

private ClientSettings clientSettings;
private MessagingFactory factory;
private URI namespaceEndpointURI;
private AsyncHttpClient asyncHttpClient;
private List<Proxy> proxies;

/**
* Creates a new {@link ManagementClientAsync}.
Expand All @@ -91,6 +91,8 @@ public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSett
this.clientSettings = clientSettings;
DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config()
.setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis())
.setUseProxySelector(true)
.setUseProxyProperties(true)
.setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis());
this.asyncHttpClient = asyncHttpClient(clientBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.microsoft.azure.servicebus;

import com.microsoft.azure.servicebus.management.ManagementClient;
import com.microsoft.azure.servicebus.management.QueueDescription;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.TransportType;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.net.*;
import java.util.*;
import java.net.InetSocketAddress;
import java.net.ProxySelector;
import java.net.URI;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ManagementClientProxyTest {
@Ignore
@Test
public void managementClientWithProxy() throws Exception {
String proxyHostName = "127.0.0.1";
int proxyPort = 8888;
final ProxySelector systemDefaultSelector = ProxySelector.getDefault();

ProxySelector.setDefault(new ProxySelector() {
@Override
public List<Proxy> select(URI uri) {
if (uri != null
&& uri.getHost() != null
) {
List<Proxy> proxies = new LinkedList<>();
proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHostName, proxyPort)));
return proxies;
}
return systemDefaultSelector.select(uri);
}

@Override
public void connectFailed(URI uri, SocketAddress sa, IOException ioe){
if (uri == null || sa == null || ioe == null) {
throw new IllegalArgumentException("Arguments can't be null.");
}
systemDefaultSelector.connectFailed(uri, sa, ioe);
}
});

URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI();
ClientSettings managementClientSettings = TestUtils.getManagementClientSettings();

ManagementClient managementClient = new ManagementClient(namespaceEndpointURI, managementClientSettings);
String queueName = "test" + UUID.randomUUID().toString().substring(0, 8);
QueueDescription q = new QueueDescription(queueName);
QueueDescription qCreated = managementClient.createQueue(q);
Assert.assertEquals(q, qCreated);

// send message
String connectionString = TestUtils.getNamespaceConnectionString();
ConnectionStringBuilder connStrBuilder = new ConnectionStringBuilder(connectionString, queueName);
connStrBuilder.setTransportType(TransportType.AMQP_WEB_SOCKETS);

QueueClient sendClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK);
Message message = new Message("hello");
sendClient.sendAsync(message).thenRunAsync(() -> sendClient.closeAsync());
waitForEnter(10);
}

private void waitForEnter(int seconds) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
executor.invokeAny(Arrays.asList(() -> {
System.in.read();
return 0;
}, () -> {
Thread.sleep(seconds * 1000);
return 0;
}));
} catch (Exception e) {
// absorb
}
}
}