diff --git a/sdk/servicebus/microsoft-azure-servicebus/pom.xml b/sdk/servicebus/microsoft-azure-servicebus/pom.xml index 249366ae0977..099128dcae8e 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/pom.xml +++ b/sdk/servicebus/microsoft-azure-servicebus/pom.xml @@ -87,5 +87,11 @@ 4.13-beta-3 test + + org.apache.httpcomponents + httpclient + 4.5.8 + test + diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClient.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClient.java index 4bff9300069e..df7ccb792bac 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClient.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClient.java @@ -3,22 +3,15 @@ package com.microsoft.azure.servicebus.management; -import com.microsoft.azure.servicebus.ClientSettings; -import com.microsoft.azure.servicebus.Utils; -import com.microsoft.azure.servicebus.primitives.AuthorizationFailedException; -import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; -import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsException; -import com.microsoft.azure.servicebus.primitives.MessagingEntityNotFoundException; -import com.microsoft.azure.servicebus.primitives.QuotaExceededException; -import com.microsoft.azure.servicebus.primitives.ServerBusyException; -import com.microsoft.azure.servicebus.primitives.ServiceBusException; -import com.microsoft.azure.servicebus.primitives.TimeoutException; -import com.microsoft.azure.servicebus.primitives.Util; +import com.microsoft.azure.servicebus.*; +import com.microsoft.azure.servicebus.primitives.*; import com.microsoft.azure.servicebus.rules.RuleDescription; import java.io.IOException; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; import java.net.URI; import java.util.List; +import java.util.concurrent.CompletableFuture; /** * Synchronous client to perform management operations on Service Bus entities. @@ -27,14 +20,18 @@ public class ManagementClient { private ManagementClientAsync asyncClient; - public ManagementClient(ConnectionStringBuilder connectionStringBuilder) { + public ManagementClient(ConnectionStringBuilder connectionStringBuilder) throws InterruptedException, ServiceBusException { this(connectionStringBuilder.getEndpoint(), Util.getClientSettingsFromConnectionStringBuilder(connectionStringBuilder)); } - public ManagementClient(URI namespaceEndpointURI, ClientSettings clientSettings) { + public ManagementClient(URI namespaceEndpointURI, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { this.asyncClient = new ManagementClientAsync(namespaceEndpointURI, clientSettings); } + public ManagementClient(URI namespaceEndpointURI, ClientSettings clientSettings, DefaultAsyncHttpClientConfig.Builder httpClientBuilder) { + this.asyncClient = new ManagementClientAsync(namespaceEndpointURI, clientSettings, httpClientBuilder); + } + /** * Retrieves information related to the namespace. * Works with any claim (Send/Listen/Manage). diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java index fefb9da1cb01..e493a199c973 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java @@ -3,17 +3,8 @@ package com.microsoft.azure.servicebus.management; -import com.microsoft.azure.servicebus.ClientSettings; -import com.microsoft.azure.servicebus.primitives.AuthorizationFailedException; -import com.microsoft.azure.servicebus.primitives.ClientConstants; -import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; -import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsException; -import com.microsoft.azure.servicebus.primitives.MessagingEntityNotFoundException; -import com.microsoft.azure.servicebus.primitives.MessagingFactory; -import com.microsoft.azure.servicebus.primitives.QuotaExceededException; -import com.microsoft.azure.servicebus.primitives.ServerBusyException; -import com.microsoft.azure.servicebus.primitives.ServiceBusException; -import com.microsoft.azure.servicebus.primitives.Util; +import com.microsoft.azure.servicebus.*; +import com.microsoft.azure.servicebus.primitives.*; import com.microsoft.azure.servicebus.rules.RuleDescription; import com.microsoft.azure.servicebus.security.SecurityToken; import com.microsoft.azure.servicebus.security.TokenProvider; @@ -24,6 +15,7 @@ import org.asynchttpclient.Request; import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.Response; +import org.asynchttpclient.proxy.ProxyServer; import org.asynchttpclient.util.HttpConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,10 +30,7 @@ import javax.xml.parsers.ParserConfigurationException; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; +import java.net.*; import java.time.Duration; import java.util.HashMap; import java.util.List; @@ -68,15 +57,18 @@ public class ManagementClientAsync { private static final String USER_AGENT = String.format("%s/%s(%s)", ClientConstants.PRODUCT_NAME, ClientConstants.CURRENT_JAVACLIENT_VERSION, ClientConstants.PLATFORM_INFO); private ClientSettings clientSettings; + private MessagingFactory factory; private URI namespaceEndpointURI; private AsyncHttpClient asyncHttpClient; + private MiscRequestResponseOperationHandler miscRequestResponseHandler; + private List proxies; /** * Creates a new {@link ManagementClientAsync}. * User should call {@link ManagementClientAsync#close()} at the end of life of the client. * @param connectionStringBuilder - connectionStringBuilder containing namespace information and client settings. */ - public ManagementClientAsync(ConnectionStringBuilder connectionStringBuilder) { + public ManagementClientAsync(ConnectionStringBuilder connectionStringBuilder) throws InterruptedException, ServiceBusException { this(connectionStringBuilder.getEndpoint(), Util.getClientSettingsFromConnectionStringBuilder(connectionStringBuilder)); } @@ -86,14 +78,93 @@ public ManagementClientAsync(ConnectionStringBuilder connectionStringBuilder) { * @param namespaceEndpointURI - URI of the namespace connecting to. * @param clientSettings - client settings. */ - public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSettings) { +// public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSettings) { +// this.namespaceEndpointURI = namespaceEndpointURI; +// this.clientSettings = clientSettings; +// DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config() +// .setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis()) +// .setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis()); +// this.asyncHttpClient = asyncHttpClient(clientBuilder); +// } + + public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSettings, DefaultAsyncHttpClientConfig.Builder httpClientBuilder) { + this.namespaceEndpointURI = namespaceEndpointURI; + this.clientSettings = clientSettings; + this.asyncHttpClient = asyncHttpClient(httpClientBuilder); + } + + public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSettings) throws InterruptedException, ServiceBusException { this.namespaceEndpointURI = namespaceEndpointURI; this.clientSettings = clientSettings; DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config() - .setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis()) - .setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis()); + .setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis()) + .setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis()); + + if(shouldUseProxy(this.namespaceEndpointURI.getHost())){ + InetSocketAddress address = (InetSocketAddress)this.proxies.get(0).address(); + String proxyHostName = address.getHostName(); + int proxyPort = address.getPort(); + clientBuilder.setProxyServer(new ProxyServer.Builder(proxyHostName, proxyPort)); + } + this.asyncHttpClient = asyncHttpClient(clientBuilder); +// CompletableFuture factoryFuture = MessagingFactory.createFromNamespaceEndpointURIAsync(namespaceEndpointURI, clientSettings); +// Utils.completeFuture(factoryFuture.thenComposeAsync((f) -> this.createInternals(f), MessagingFactory.INTERNAL_THREAD_POOL)); +// if (TRACE_LOGGER.isInfoEnabled()) { +// TRACE_LOGGER.info("Created management client '{}'", namespaceEndpointURI.toString()); +// } + } + + public boolean shouldUseProxy(final String hostName) { + final URI uri = createURIFromHostNamePort(hostName, ClientConstants.HTTPS_PORT); + final ProxySelector proxySelector = ProxySelector.getDefault(); + if (proxySelector == null) { + return false; + } + + final List proxies = proxySelector.select(uri); + if (isProxyAddressLegal(proxies)) { + this.proxies = proxies; + return true; + } else { + return false; + } + } + + private static URI createURIFromHostNamePort(final String hostName, final int port) { + return URI.create(String.format(ClientConstants.HTTPS_URI_FORMAT, hostName, port)); + } + + private static boolean isProxyAddressLegal(final List proxies) { + // only checks the first proxy in the list + // returns true if it is an InetSocketAddress, which is required for qpid-proton-j library + return proxies != null + && !proxies.isEmpty() + && proxies.get(0).type() == Proxy.Type.HTTP + && proxies.get(0).address() != null + && proxies.get(0).address() instanceof InetSocketAddress; } + private CompletableFuture createInternals(MessagingFactory factory) { + this.factory = factory; + + CompletableFuture postSessionBrowserFuture = MiscRequestResponseOperationHandler.create(factory).thenAcceptAsync((msoh) -> { + this.miscRequestResponseHandler = msoh; + //this.sessionBrowser = new SessionBrowser(factory, queuePath, MessagingEntityType.QUEUE, msoh); + }, MessagingFactory.INTERNAL_THREAD_POOL); + + return CompletableFuture.allOf(postSessionBrowserFuture); + } + + // No op now +// @Override +// CompletableFuture initializeAsync() { +// return CompletableFuture.completedFuture(null); +// } +// +// @Override +// protected CompletableFuture onClose() { +// return this.miscRequestResponseHandler.closeAsync().thenCompose((w) -> this.factory.closeAsync()); +// } /** * Retrieves information related to the namespace. diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java index 6a6a42040fa7..25c2023f53ad 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java @@ -20,7 +20,7 @@ public final class MiscRequestResponseOperationHandler extends ClientEntity { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MiscRequestResponseOperationHandler.class); - + private final Object requestResonseLinkCreationLock = new Object(); private final String entityPath; private final MessagingEntityType entityType; @@ -37,6 +37,10 @@ private MiscRequestResponseOperationHandler(MessagingFactory factory, String lin } @Deprecated + public static CompletableFuture create(MessagingFactory factory) { + return create(factory, null, null); + } + public static CompletableFuture create(MessagingFactory factory, String entityPath) { return create(factory, entityPath, null); } @@ -76,7 +80,7 @@ private CompletableFuture createRequestResponseLink() { return null; }, MessagingFactory.INTERNAL_THREAD_POOL); } - + return this.requestResponseLinkCreationFuture; } } diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientSessionTests.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientSessionTests.java index 7ed6de11467e..3008c2f78ed6 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientSessionTests.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientSessionTests.java @@ -29,16 +29,16 @@ public abstract class ClientSessionTests extends Tests { private String receiveEntityPath; private IMessageSender sendClient; private IMessageAndSessionPump receiveClient; - + @BeforeClass - public static void init() { + public static void init() throws ServiceBusException, InterruptedException { ClientSessionTests.entityNameCreatedForAllTests = null; ClientSessionTests.receiveEntityPathForAllTest = null; URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI(); ClientSettings managementClientSettings = TestUtils.getManagementClientSettings(); managementClient = new ManagementClientAsync(namespaceEndpointURI, managementClientSettings); } - + @Before public void setup() throws InterruptedException, ExecutionException { if (this.shouldCreateEntityForEveryTest() || ClientSessionTests.entityNameCreatedForAllTests == null) { @@ -72,7 +72,7 @@ public void setup() throws InterruptedException, ExecutionException { this.receiveEntityPath = ClientSessionTests.receiveEntityPathForAllTest; } } - + @After public void tearDown() throws ServiceBusException, InterruptedException, ExecutionException { if (this.sendClient != null) { @@ -85,14 +85,14 @@ public void tearDown() throws ServiceBusException, InterruptedException, Executi ((QueueClient) this.receiveClient).close(); } } - + if (this.shouldCreateEntityForEveryTest()) { managementClient.deleteQueueAsync(this.entityName).get(); } else { TestCommons.drainAllSessions(this.receiveEntityPath, this.isEntityQueue()); } } - + @AfterClass public static void cleanupAfterAllTest() throws ExecutionException, InterruptedException, IOException { if (managementClient != null) { @@ -103,7 +103,7 @@ public static void cleanupAfterAllTest() throws ExecutionException, InterruptedE managementClient.close(); } } - + private void createClients(ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { if (this.isEntityQueue()) { this.sendClient = new QueueClient(TestUtils.getNamespaceEndpointURI(), this.entityName, TestUtils.getClientSettings(), receiveMode); @@ -113,49 +113,49 @@ private void createClients(ReceiveMode receiveMode) throws InterruptedException, this.receiveClient = new SubscriptionClient(TestUtils.getNamespaceEndpointURI(), this.receiveEntityPath, TestUtils.getClientSettings(), receiveMode); } } - + @Test public void testRegisterAnotherHandlerAfterSessionHandler() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); MessageAndSessionPumpTests.testRegisterAnotherHandlerAfterSessionHandler(this.receiveClient); } - + @Test public void testGetMessageSessions() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); TestCommons.testGetMessageSessions(this.sendClient, this.receiveClient); } - + @Test public void testSessionPumpAutoCompleteWithOneConcurrentCallPerSession() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); MessageAndSessionPumpTests.testSessionPumpAutoCompleteWithOneConcurrentCallPerSession(this.sendClient, this.receiveClient); } - + @Test public void testReceiveAndDeleteSessionPump() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.RECEIVEANDDELETE); MessageAndSessionPumpTests.testSessionPumpAutoCompleteWithOneConcurrentCallPerSession(this.sendClient, this.receiveClient); } - + @Test public void testSessionPumpAutoCompleteWithMultipleConcurrentCallsPerSession() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); MessageAndSessionPumpTests.testSessionPumpAutoCompleteWithMultipleConcurrentCallsPerSession(this.sendClient, this.receiveClient); } - + @Test public void testSessionPumpClientComplete() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); MessageAndSessionPumpTests.testSessionPumpClientComplete(this.sendClient, this.receiveClient); } - + @Test public void testSessionPumpAbandonOnException() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); MessageAndSessionPumpTests.testSessionPumpAbandonOnException(this.sendClient, this.receiveClient); } - + @Test public void testSessionPumpRenewLock() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientTests.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientTests.java index b01eead8c842..80f71e27e59d 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientTests.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientTests.java @@ -29,16 +29,16 @@ public abstract class ClientTests extends Tests { private String receiveEntityPath; protected IMessageSender sendClient; protected IMessageAndSessionPump receiveClient; - + @BeforeClass - public static void init() { + public static void init() throws ServiceBusException, InterruptedException { ClientTests.entityNameCreatedForAllTests = null; ClientTests.receiveEntityPathForAllTest = null; URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI(); ClientSettings managementClientSettings = TestUtils.getManagementClientSettings(); managementClientAsync = new ManagementClientAsync(namespaceEndpointURI, managementClientSettings); } - + @Before public void setup() throws ExecutionException, InterruptedException { if (this.shouldCreateEntityForEveryTest() || ClientTests.entityNameCreatedForAllTests == null) { @@ -70,7 +70,7 @@ public void setup() throws ExecutionException, InterruptedException { this.receiveEntityPath = ClientTests.receiveEntityPathForAllTest; } } - + @After public void tearDown() throws ServiceBusException, InterruptedException, ExecutionException { if (this.sendClient != null) { @@ -83,14 +83,14 @@ public void tearDown() throws ServiceBusException, InterruptedException, Executi ((QueueClient) this.receiveClient).close(); } } - + if (this.shouldCreateEntityForEveryTest()) { managementClientAsync.deleteQueueAsync(this.entityName).get(); } else { TestCommons.drainAllMessages(this.receiveEntityPath); } } - + @AfterClass public static void cleanupAfterAllTest() throws ExecutionException, InterruptedException, IOException { if (managementClientAsync == null) { @@ -102,7 +102,7 @@ public static void cleanupAfterAllTest() throws ExecutionException, InterruptedE managementClientAsync.close(); } - + protected void createClients(ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { if (this.isEntityQueue()) { this.sendClient = new QueueClient(TestUtils.getNamespaceEndpointURI(), this.entityName, TestUtils.getClientSettings(), receiveMode); @@ -112,37 +112,37 @@ protected void createClients(ReceiveMode receiveMode) throws InterruptedExceptio this.receiveClient = new SubscriptionClient(TestUtils.getNamespaceEndpointURI(), this.receiveEntityPath, TestUtils.getClientSettings(), receiveMode); } } - + @Test public void testMessagePumpAutoComplete() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); MessageAndSessionPumpTests.testMessagePumpAutoComplete(this.sendClient, this.receiveClient); } - + @Test public void testReceiveAndDeleteMessagePump() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.RECEIVEANDDELETE); MessageAndSessionPumpTests.testMessagePumpAutoComplete(this.sendClient, this.receiveClient); } - + @Test public void testMessagePumpClientComplete() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); MessageAndSessionPumpTests.testMessagePumpClientComplete(this.sendClient, this.receiveClient); } - + @Test public void testMessagePumpAbandonOnException() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); MessageAndSessionPumpTests.testMessagePumpAbandonOnException(this.sendClient, this.receiveClient); } - + @Test public void testMessagePumpRenewLock() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); MessageAndSessionPumpTests.testMessagePumpRenewLock(this.sendClient, this.receiveClient); } - + @Test public void testRegisterAnotherHandlerAfterMessageHandler() throws InterruptedException, ServiceBusException { this.createClients(ReceiveMode.PEEKLOCK); diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java index 4164b8feaf33..28575ddf0c24 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ClientValidationTests.java @@ -32,7 +32,7 @@ public class ClientValidationTests extends TestBase { private static ManagementClientAsync managementClient; @BeforeClass - public static void createEntities() throws ExecutionException, InterruptedException { + public static void createEntities() throws ExecutionException, InterruptedException, ServiceBusException { // Create a queue, a topic and a subscription queuePath = TestUtils.randomizeEntityName(ENTITY_NAME_PREFIX); sessionfulQueuePath = TestUtils.randomizeEntityName(ENTITY_NAME_PREFIX); diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/DaxiTest.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/DaxiTest.java new file mode 100644 index 000000000000..1d9be680ed33 --- /dev/null +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/DaxiTest.java @@ -0,0 +1,276 @@ +package com.microsoft.azure.servicebus; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import com.microsoft.azure.servicebus.management.ManagementClient; +import com.microsoft.azure.servicebus.management.ManagementClientAsync; +import com.microsoft.azure.servicebus.management.QueueDescription; +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import com.microsoft.azure.servicebus.primitives.TransportType; +import org.apache.http.HttpHost; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.client.HttpClientBuilder; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.Realm; +import org.asynchttpclient.proxy.ProxyServer; +import org.asynchttpclient.proxy.ProxyType; +import org.junit.Assert; +import org.junit.Test; +import sun.net.www.http.HttpClient; + +import java.io.IOException; +import java.net.*; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static java.nio.charset.StandardCharsets.UTF_8; + +public class DaxiTest { + static final Gson GSON = new Gson(); + + @Test + public void queueClientWithProxy() throws Exception { + String proxyHostName = "127.0.0.1"; + int proxyPort = 8888; + final ProxySelector systemDefaultSelector = ProxySelector.getDefault(); + ProxySelector.setDefault(new ProxySelector() { + @Override + public List select(URI uri) { + if (uri != null + && uri.getHost() != null +// && uri.getHost().equalsIgnoreCase(proxyHostName) + ) { + List proxies = new LinkedList<>(); + proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHostName, proxyPort))); + return proxies; + } + return systemDefaultSelector.select(uri); + } + + @Override + public void connectFailed(URI uri, SocketAddress sa, IOException ioe){ + if (uri == null || sa == null || ioe == null) { + throw new IllegalArgumentException("Arguments can't be null."); + } + systemDefaultSelector.connectFailed(uri, sa, ioe); + } + }); + + String connectionString = "Endpoint=sb://anqyan-messaging.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=gqHB0C/IfW/d8uECt5OgHJSAnV7TEDYw2ps6gNURHK0="; + ConnectionStringBuilder connStrBuilder = new ConnectionStringBuilder(connectionString, "javaqueue"); + connStrBuilder.setTransportType(TransportType.AMQP_WEB_SOCKETS); + + QueueClient receiveClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK); + // We are using single thread executor as we are only processing one message at a time + ExecutorService executorService = Executors.newSingleThreadExecutor(); + this.registerReceiver(receiveClient, executorService); + + // Create a QueueClient instance for sending and then asynchronously send messages. + // Close the sender once the send operation is complete. + QueueClient sendClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK); + this.sendMessagesAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync()); + + // wait for ENTER or 10 seconds elapsing + waitForEnter(10); + + // shut down receiver to close the receive loop + receiveClient.close(); + executorService.shutdown(); + } + + CompletableFuture sendMessagesAsync(QueueClient sendClient) { + List> data = + GSON.fromJson( + "[" + + "{'name' = 'Einstein', 'firstName' = 'Albert'}," + + "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," + + "{'name' = 'Curie', 'firstName' = 'Marie'}," + + "{'name' = 'Hawking', 'firstName' = 'Steven'}," + + "{'name' = 'Newton', 'firstName' = 'Isaac'}," + + "{'name' = 'Bohr', 'firstName' = 'Niels'}," + + "{'name' = 'Faraday', 'firstName' = 'Michael'}," + + "{'name' = 'Galilei', 'firstName' = 'Galileo'}," + + "{'name' = 'Kepler', 'firstName' = 'Johannes'}," + + "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" + + "]", + new TypeToken>>() {}.getType()); + + List tasks = new ArrayList<>(); + for (int i = 0; i < data.size(); i++) { + final String messageId = Integer.toString(i); + Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8)); + message.setContentType("application/json"); + message.setLabel("Scientist"); + message.setMessageId(messageId); + message.setTimeToLive(Duration.ofMinutes(2)); + System.out.printf("\nMessage sending: Id = %s", message.getMessageId()); + tasks.add( + sendClient.sendAsync(message).thenRunAsync(() -> { + System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId()); + })); + } + return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[tasks.size()])); + } + + void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception { + + + // register the RegisterMessageHandler callback with executor service + queueClient.registerMessageHandler(new IMessageHandler() { + // callback invoked when the message handler loop has obtained a message + public CompletableFuture onMessageAsync(IMessage message) { + // receives message is passed to callback + if (message.getLabel() != null && + message.getContentType() != null && + message.getLabel().contentEquals("Scientist") && + message.getContentType().contentEquals("application/json")) { + + byte[] body = message.getBody(); + Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class); + + System.out.printf( + "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," + + "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n", + message.getMessageId(), + message.getSequenceNumber(), + message.getEnqueuedTimeUtc(), + message.getExpiresAtUtc(), + message.getContentType(), + scientist != null ? scientist.get("firstName") : "", + scientist != null ? scientist.get("name") : ""); + } + return CompletableFuture.completedFuture(null); + } + + // callback invoked when the message handler has an exception to report + public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) { + System.out.printf(exceptionPhase + "-" + throwable.getMessage()); + } + }, + // 1 concurrent call, messages are auto-completed, auto-renew duration + new MessageHandlerOptions(1, true, Duration.ofMinutes(1)), + executorService); + + } + + private void waitForEnter(int seconds) { + ExecutorService executor = Executors.newCachedThreadPool(); + try { + executor.invokeAny(Arrays.asList(() -> { + System.in.read(); + return 0; + }, () -> { + Thread.sleep(seconds * 1000); + return 0; + })); + } catch (Exception e) { + // absorb + } + } + + @Test + public void managementClientAsync() throws InterruptedException, ExecutionException, ServiceBusException { + URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI(); + //System.out.println("hello" + namespaceEndpointURI.getPath()); + ClientSettings managementClientSettings = TestUtils.getManagementClientSettings(); + ManagementClientAsync managementClientAsync = new ManagementClientAsync(namespaceEndpointURI, managementClientSettings); + //ManagementClient managementClient = new ManagementClient("Endpoint=sb://contoso.servicebus.onebox.windows-int.net/;SharedAccessKeyName=DefaultNamespaceSasAllKeyName;SharedAccessKey=8864/auVd3qDC75iTjBL1GJ4D2oXC6bIttRd0jzDZ+g="); + String queueName = "managementClientAsync" + UUID.randomUUID().toString().substring(0, 8); + QueueDescription q = new QueueDescription(queueName); + QueueDescription qCreated = managementClientAsync.createQueueAsync(q).get(); + Assert.assertEquals(q, qCreated); + } + + + @Test + public void managementClientWithProxy() throws InterruptedException, ExecutionException, ServiceBusException { + String proxyHostName = "127.0.0.1"; + int proxyPort = 8888; + final ProxySelector systemDefaultSelector = ProxySelector.getDefault(); + ProxySelector.setDefault(new ProxySelector() { + @Override + public List select(URI uri) { + if (uri != null + && uri.getHost() != null +// && uri.getHost().equalsIgnoreCase(proxyHostName) + ) { + List proxies = new LinkedList<>(); + proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHostName, proxyPort))); + return proxies; + } + return systemDefaultSelector.select(uri); + } + + @Override + public void connectFailed(URI uri, SocketAddress sa, IOException ioe){ + if (uri == null || sa == null || ioe == null) { + throw new IllegalArgumentException("Arguments can't be null."); + } + systemDefaultSelector.connectFailed(uri, sa, ioe); + } + }); + + URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI(); + ClientSettings managementClientSettings = TestUtils.getManagementClientSettings(); + + ManagementClient managementClient = new ManagementClient(namespaceEndpointURI, managementClientSettings); + String queueName = "proxy" + UUID.randomUUID().toString().substring(0, 8); + QueueDescription q = new QueueDescription(queueName); + QueueDescription qCreated = managementClient.createQueue(q); + Assert.assertEquals(q, qCreated); + } + + @Test + public void managementClient() throws InterruptedException, ExecutionException, ServiceBusException { + URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI(); + ClientSettings managementClientSettings = TestUtils.getManagementClientSettings(); + DefaultAsyncHttpClientConfig.Builder cfg = new DefaultAsyncHttpClientConfig.Builder(); + ProxyServer.Builder proxy = new ProxyServer.Builder("127.0.0.1", 8888 ); + + DefaultAsyncHttpClientConfig.Builder httpClientBuilder = + cfg + .setUseProxySelector(true) + .setProxyServer(proxy); + + ManagementClient managementClient = new ManagementClient(namespaceEndpointURI, managementClientSettings, httpClientBuilder); + String queueName = "NEW" + UUID.randomUUID().toString().substring(0, 8); + QueueDescription q = new QueueDescription(queueName); + QueueDescription qCreated = managementClient.createQueue(q); + Assert.assertEquals(q, qCreated); + } + + + @Test + public void managementClient2() throws InterruptedException, ExecutionException, ServiceBusException { + URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI(); + //System.out.println("hello" + namespaceEndpointURI.getPath()); + ClientSettings managementClientSettings = TestUtils.getManagementClientSettings(); + //DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder().build(); + DefaultAsyncHttpClientConfig.Builder cfg = new DefaultAsyncHttpClientConfig.Builder(); + ProxyServer.Builder proxy = new ProxyServer.Builder("http://localhost", 8888 ); //"127.0.0.1" + + DefaultAsyncHttpClientConfig.Builder httpClientBuilder = + cfg + .setProxyServer(proxy); + + // .setUseProxySelector(true) + +// HttpHost httpHost = new HttpHost( "127.0.0.1", 8888 ); +// RequestConfig requestConfig = RequestConfig.custom() +// .setProxy( httpHost ) +// .build(); + + //HttpClientBuilder httpClient = HttpClientBuilder.create().setProxy(httpHost); + ManagementClient managementClient = new ManagementClient(namespaceEndpointURI, managementClientSettings, httpClientBuilder); + //ManagementClient managementClient = new ManagementClient("Endpoint=sb://contoso.servicebus.onebox.windows-int.net/;SharedAccessKeyName=DefaultNamespaceSasAllKeyName;SharedAccessKey=8864/auVd3qDC75iTjBL1GJ4D2oXC6bIttRd0jzDZ+g="); + String queueName = "managementClient" + UUID.randomUUID().toString().substring(0, 8); + QueueDescription q = new QueueDescription(queueName); + QueueDescription qCreated = managementClient.createQueue(q); + Assert.assertEquals(q, qCreated); + } +} diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementTests.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementTests.java index 9bc478bfd44b..25dce36ddaec 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementTests.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementTests.java @@ -44,12 +44,19 @@ public class ManagementTests extends TestBase { private ManagementClientAsync managementClientAsync; @Before - public void setup() { + public void setup() throws ServiceBusException, InterruptedException { URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI(); ClientSettings managementClientSettings = TestUtils.getManagementClientSettings(); managementClientAsync = new ManagementClientAsync(namespaceEndpointURI, managementClientSettings); } + public void Daxi() throws InterruptedException, ExecutionException { + String queueName = UUID.randomUUID().toString().substring(0, 8); + QueueDescription q = new QueueDescription(queueName); + QueueDescription qCreated = this.managementClientAsync.createQueueAsync(q).get(); + Assert.assertEquals(q, qCreated); + } + @Test public void basicQueueCrudTest() throws InterruptedException, ExecutionException { String queueName = UUID.randomUUID().toString().substring(0, 8); @@ -514,13 +521,13 @@ public void forwardingEntitySetupTest() throws ServiceBusException, InterruptedE this.managementClientAsync.deleteQueueAsync(destinationName); this.managementClientAsync.deleteQueueAsync(dlqDestinationName); } - + @Test public void subscriptionForwardToCreationTest() throws ServiceBusException, InterruptedException { String sourceName = UUID.randomUUID().toString().substring(0, 8); String destinationName = UUID.randomUUID().toString().substring(0, 8); String subscriptionName = "subscription1"; - + TopicDescription destinationTopicDesc = new TopicDescription(destinationName); Utils.completeFuture(this.managementClientAsync.createTopicAsync(destinationTopicDesc)); SubscriptionDescription subDesc = new SubscriptionDescription(destinationName, subscriptionName); diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java index 35f7105193a7..74fd51837174 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SendReceiveTests.java @@ -24,7 +24,7 @@ public abstract class SendReceiveTests extends Tests { static ManagementClientAsync managementClient = null; private static String entityNameCreatedForAllTests = null; private static String receiveEntityPathForAllTest = null; - + MessagingFactory factory; IMessageSender sender; IMessageReceiver receiver; @@ -34,7 +34,7 @@ public abstract class SendReceiveTests extends Tests { private final String sessionId = null; @BeforeClass - public static void init() { + public static void init() throws ServiceBusException, InterruptedException { SendReceiveTests.entityNameCreatedForAllTests = null; SendReceiveTests.receiveEntityPathForAllTest = null; URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI(); @@ -86,14 +86,14 @@ public void tearDown() throws ServiceBusException, InterruptedException, Executi if (this.sender != null) { this.sender.close(); } - + if (this.receiver != null) { this.receiver.close(); } - + if (this.factory != null) { this.factory.close(); - } + } if (this.shouldCreateEntityForEveryTest()) { managementClient.deleteQueueAsync(this.entityName).get(); @@ -230,7 +230,7 @@ public void testSendReceiveMessageWithVariousPropertyTypes() throws InterruptedE this.receiver = ClientFactory.createMessageReceiverFromEntityPath(factory, this.receiveEntityPath, ReceiveMode.RECEIVEANDDELETE); TestCommons.testSendReceiveMessageWithVariousPropertyTypes(this.sender, this.sessionId, this.receiver); } - + @Test public void testLongPollReceiveOnLinkAbort() throws InterruptedException, ServiceBusException, ExecutionException { diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java index 447019d67a5b..fe50a1ecaa22 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/SessionTests.java @@ -37,7 +37,7 @@ public abstract class SessionTests extends Tests { String receiveEntityPath; @BeforeClass - public static void init() { + public static void init() throws ServiceBusException, InterruptedException { SessionTests.entityNameCreatedForAllTests = null; SessionTests.receiveEntityPathForAllTest = null; URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI(); @@ -87,19 +87,19 @@ public void tearDown() throws ServiceBusException, InterruptedException, Executi if (!this.shouldCreateEntityForEveryTest()) { this.drainSession(); } - + if (this.sender != null) { this.sender.close(); } - + if (this.session != null) { this.session.close(); } - + if (this.factory != null) { this.factory.close(); - } - + } + if (this.shouldCreateEntityForEveryTest()) { managementClient.deleteQueueAsync(this.entityName).get(); } diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java index c4d01c7c3670..672e2be73a75 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/TestUtils.java @@ -63,7 +63,7 @@ public static ClientSettings getClientSettings() { } return Util.getClientSettingsFromConnectionStringBuilder(namespaceConnectionStringBuilder); } - + // AADTokens cannot yet be used for management operations, sent directly to gateway public static ClientSettings getManagementClientSettings() { return Util.getClientSettingsFromConnectionStringBuilder(namespaceConnectionStringBuilder); @@ -92,9 +92,9 @@ public static String randomizeEntityName(String entityName) { public static String getRandomString() { return UUID.randomUUID().toString(); } - + /** - * Tells this class whether to create an entity for every test and delete it after the test. Creating an entity for every test makes the tests independent of + * Tells this class whether to create an entity for every test and delete it after the test. Creating an entity for every test makes the tests independent of * each other and advisable if the SB namespace allows it. If the namespace doesn't allow creation and deletion of many entities in a short span of time, the suite * will create one entity at the start, uses it for all test and deletes the entity at the end. * @return true if each test should create and delete its own entity. Else return false.