From 43a71763adbbbe7d93c3fde0f82ffe4e1cb89c50 Mon Sep 17 00:00:00 2001 From: dorothy Date: Wed, 25 Mar 2020 12:54:01 -0700 Subject: [PATCH 1/8] getMessageSessions doc update --- .../com/microsoft/azure/servicebus/IMessageSessionEntity.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSessionEntity.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSessionEntity.java index 13e0fa6a27c1..ff8ed923c9d5 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSessionEntity.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSessionEntity.java @@ -35,6 +35,8 @@ public interface IMessageSessionEntity { /** * Gets the message sessions, enabling you to browse sessions on queues. + * Only sessions with active messages in the queue are returned. + * The sessions on the deadletter queue or sessions having a SessionState as 'active' and no messages are not returned with this call. * * @return A collection of sessions. * @throws InterruptedException if the current thread was interrupted while waiting. From 1393e1a09413123aa5cacc2a9cbeab44afef3282 Mon Sep 17 00:00:00 2001 From: dorothy Date: Wed, 25 Mar 2020 15:09:45 -0700 Subject: [PATCH 2/8] Revert "getMessageSessions doc update" This reverts commit 43a71763adbbbe7d93c3fde0f82ffe4e1cb89c50. --- .../com/microsoft/azure/servicebus/IMessageSessionEntity.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSessionEntity.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSessionEntity.java index ff8ed923c9d5..13e0fa6a27c1 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSessionEntity.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/IMessageSessionEntity.java @@ -35,8 +35,6 @@ public interface IMessageSessionEntity { /** * Gets the message sessions, enabling you to browse sessions on queues. - * Only sessions with active messages in the queue are returned. - * The sessions on the deadletter queue or sessions having a SessionState as 'active' and no messages are not returned with this call. * * @return A collection of sessions. * @throws InterruptedException if the current thread was interrupted while waiting. From d64438077b4aaf172307a621ae39623558a371c2 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Tue, 12 May 2020 16:31:32 -0700 Subject: [PATCH 3/8] Enable ManagementClient to use proxy --- .../microsoft-azure-servicebus/pom.xml | 6 ++ .../management/ManagementClientAsync.java | 47 +++++++++++++-- .../servicebus/ManagementClientProxyTest.java | 57 +++++++++++++++++++ 3 files changed, 106 insertions(+), 4 deletions(-) create mode 100644 sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java diff --git a/sdk/servicebus/microsoft-azure-servicebus/pom.xml b/sdk/servicebus/microsoft-azure-servicebus/pom.xml index 249366ae0977..71c61497253e 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/pom.xml +++ b/sdk/servicebus/microsoft-azure-servicebus/pom.xml @@ -87,5 +87,11 @@ 4.13-beta-3 test + + com.microsoft.azure + azure-servicebus + 2.0.0-PREVIEW-7 + test + diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java index fefb9da1cb01..d0505f4166d9 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java @@ -24,6 +24,7 @@ import org.asynchttpclient.Request; import org.asynchttpclient.RequestBuilder; import org.asynchttpclient.Response; +import org.asynchttpclient.proxy.ProxyServer; import org.asynchttpclient.util.HttpConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,10 +39,7 @@ import javax.xml.parsers.ParserConfigurationException; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; +import java.net.*; import java.time.Duration; import java.util.HashMap; import java.util.List; @@ -68,8 +66,10 @@ public class ManagementClientAsync { private static final String USER_AGENT = String.format("%s/%s(%s)", ClientConstants.PRODUCT_NAME, ClientConstants.CURRENT_JAVACLIENT_VERSION, ClientConstants.PLATFORM_INFO); private ClientSettings clientSettings; + private MessagingFactory factory; private URI namespaceEndpointURI; private AsyncHttpClient asyncHttpClient; + private List proxies; /** * Creates a new {@link ManagementClientAsync}. @@ -92,9 +92,48 @@ public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSett DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config() .setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis()) .setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis()); + + if(shouldUseProxy(this.namespaceEndpointURI.getHost())){ + InetSocketAddress address = (InetSocketAddress)this.proxies.get(0).address(); + String proxyHostName=address.getHostName(); + int proxyPort=address.getPort(); + + clientBuilder.setProxyServer(new ProxyServer.Builder(proxyHostName,proxyPort)); + } + this.asyncHttpClient = asyncHttpClient(clientBuilder); } + public boolean shouldUseProxy(final String hostName) { + final URI uri = createURIFromHostNamePort(hostName, ClientConstants.HTTPS_PORT); + final ProxySelector proxySelector = ProxySelector.getDefault(); + if (proxySelector == null) { + return false; + } + + final List proxies = proxySelector.select(uri); + if (isProxyAddressLegal(proxies)) { + this.proxies = proxies; + return true; + } else { + return false; + } + } + + private static URI createURIFromHostNamePort(final String hostName, final int port) { + return URI.create(String.format(ClientConstants.HTTPS_URI_FORMAT, hostName, port)); + } + + private static boolean isProxyAddressLegal(final List proxies) { + // only checks the first proxy in the list + // returns true if it is an InetSocketAddress, which is required for qpid-proton-j library + return proxies != null + && !proxies.isEmpty() + && proxies.get(0).type() == Proxy.Type.HTTP + && proxies.get(0).address() != null + && proxies.get(0).address() instanceof InetSocketAddress; + } + /** * Retrieves information related to the namespace. * Works with any claim (Send/Listen/Manage). diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java new file mode 100644 index 000000000000..3265905bc241 --- /dev/null +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java @@ -0,0 +1,57 @@ +package com.microsoft.azure.servicebus; + +import com.microsoft.azure.servicebus.management.ManagementClient; +import com.microsoft.azure.servicebus.management.QueueDescription; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import org.junit.Assert; +import org.junit.Test; +import java.io.IOException; +import java.net.*; +import java.util.*; +import java.net.InetSocketAddress; +import java.net.ProxySelector; +import java.net.URI; +import java.util.LinkedList; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +public class ManagementClientProxyTest { + @Test + public void managementClientWithProxy() throws InterruptedException, ServiceBusException { + String proxyHostName = "127.0.0.1"; + int proxyPort = 8888; + final ProxySelector systemDefaultSelector = ProxySelector.getDefault(); + + ProxySelector.setDefault(new ProxySelector() { + @Override + public List select(URI uri) { + if (uri != null + && uri.getHost() != null +// && uri.getHost().equalsIgnoreCase(proxyHostName) + ) { + List proxies = new LinkedList<>(); + proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHostName, proxyPort))); + return proxies; + } + return systemDefaultSelector.select(uri); + } + + @Override + public void connectFailed(URI uri, SocketAddress sa, IOException ioe){ + if (uri == null || sa == null || ioe == null) { + throw new IllegalArgumentException("Arguments can't be null."); + } + systemDefaultSelector.connectFailed(uri, sa, ioe); + } + }); + + URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI(); + ClientSettings managementClientSettings = TestUtils.getManagementClientSettings(); + + ManagementClient managementClient = new ManagementClient(namespaceEndpointURI, managementClientSettings); + String queueName = "proxy" + UUID.randomUUID().toString().substring(0, 8); + QueueDescription q = new QueueDescription(queueName); + QueueDescription qCreated = managementClient.createQueue(q); + Assert.assertEquals(q, qCreated); + } +} From f3eab7a8d86c30426152db8dcd8bb28f395abf8a Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Tue, 19 May 2020 15:33:14 -0700 Subject: [PATCH 4/8] draft changes --- .../microsoft-azure-servicebus/pom.xml | 18 +++ .../management/ManagementClientAsync.java | 16 +- .../servicebus/ManagementClientProxyTest.java | 153 +++++++++++++++++- 3 files changed, 177 insertions(+), 10 deletions(-) diff --git a/sdk/servicebus/microsoft-azure-servicebus/pom.xml b/sdk/servicebus/microsoft-azure-servicebus/pom.xml index 71c61497253e..fc4933b3a978 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/pom.xml +++ b/sdk/servicebus/microsoft-azure-servicebus/pom.xml @@ -93,5 +93,23 @@ 2.0.0-PREVIEW-7 test + + com.squareup.retrofit2 + retrofit + 2.6.2 + test + + + com.google.guava + guava + 28.2-jre + test + + + ch.qos.logback + logback-core + 1.1.2 + test + diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java index d0505f4166d9..84b42d3062d0 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java @@ -91,15 +91,17 @@ public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSett this.clientSettings = clientSettings; DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config() .setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis()) + .setUseProxySelector(true) +// .setUseProxyProperties(true) .setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis()); - if(shouldUseProxy(this.namespaceEndpointURI.getHost())){ - InetSocketAddress address = (InetSocketAddress)this.proxies.get(0).address(); - String proxyHostName=address.getHostName(); - int proxyPort=address.getPort(); - - clientBuilder.setProxyServer(new ProxyServer.Builder(proxyHostName,proxyPort)); - } +// if(shouldUseProxy(this.namespaceEndpointURI.getHost())){ +// InetSocketAddress address = (InetSocketAddress)this.proxies.get(0).address(); +// String proxyHostName=address.getHostName(); +// int proxyPort=address.getPort(); +// +// clientBuilder.setProxyServer(new ProxyServer.Builder(proxyHostName,proxyPort)); +// } this.asyncHttpClient = asyncHttpClient(clientBuilder); } diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java index 3265905bc241..b34f41dd3b41 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java @@ -1,23 +1,49 @@ package com.microsoft.azure.servicebus; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; import com.microsoft.azure.servicebus.management.ManagementClient; import com.microsoft.azure.servicebus.management.QueueDescription; +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import com.microsoft.azure.servicebus.primitives.TransportType; import org.junit.Assert; import org.junit.Test; import java.io.IOException; import java.net.*; +import java.time.Duration; import java.util.*; import java.net.InetSocketAddress; import java.net.ProxySelector; import java.net.URI; import java.util.LinkedList; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static java.nio.charset.StandardCharsets.UTF_8; public class ManagementClientProxyTest { + static final Gson GSON = new Gson(); + + @Test + public void queueSend() throws Exception { + String connectionString = "Endpoint=sb://anqyan-messaging.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=gqHB0C/IfW/d8uECt5OgHJSAnV7TEDYw2ps6gNURHK0="; + ConnectionStringBuilder connStrBuilder = new ConnectionStringBuilder(connectionString, "javaqueue");//queueName + connStrBuilder.setTransportType(TransportType.AMQP_WEB_SOCKETS); + + QueueClient sendClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK); + Message message = new Message("hello"); + sendClient.sendAsync(message).thenRunAsync(() -> + System.out.printf("Done") + ); + waitForEnter(10); + } + @Test - public void managementClientWithProxy() throws InterruptedException, ServiceBusException { + public void managementClientWithProxy() throws Exception { String proxyHostName = "127.0.0.1"; int proxyPort = 8888; final ProxySelector systemDefaultSelector = ProxySelector.getDefault(); @@ -27,7 +53,6 @@ public void managementClientWithProxy() throws InterruptedException, ServiceBusE public List select(URI uri) { if (uri != null && uri.getHost() != null -// && uri.getHost().equalsIgnoreCase(proxyHostName) ) { List proxies = new LinkedList<>(); proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHostName, proxyPort))); @@ -49,9 +74,131 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe){ ClientSettings managementClientSettings = TestUtils.getManagementClientSettings(); ManagementClient managementClient = new ManagementClient(namespaceEndpointURI, managementClientSettings); - String queueName = "proxy" + UUID.randomUUID().toString().substring(0, 8); + String queueName = "lll" + UUID.randomUUID().toString().substring(0, 8); QueueDescription q = new QueueDescription(queueName); QueueDescription qCreated = managementClient.createQueue(q); Assert.assertEquals(q, qCreated); + + // send message + String connectionString = TestUtils.getNamespaceConnectionString(); + ConnectionStringBuilder connStrBuilder = new ConnectionStringBuilder(connectionString, queueName); + connStrBuilder.setTransportType(TransportType.AMQP_WEB_SOCKETS); + + QueueClient sendClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK); + Message message = new Message("hello"); + sendClient.sendAsync(message).thenRunAsync(() -> + //System.out.printf("Done") + sendClient.closeAsync() +); + waitForEnter(10); + +// QueueClient receiveClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK); +// // We are using single thread executor as we are only processing one message at a time +// ExecutorService executorService = Executors.newSingleThreadExecutor(); +// this.registerReceiver(receiveClient, executorService); +// +// // Create a QueueClient instance for sending and then asynchronously send messages. +// // Close the sender once the send operation is complete. +// QueueClient sendClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK); +// this.sendMessagesAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync()); +//// Message message = new Message(GSON.toJson("Sent Message Via Proxy. ").getBytes(UTF_8)); +//// sendClient.sendAsync(message).thenRunAsync(sendClient::closeAsync); +// +// // wait for ENTER or 10 seconds elapsing +// waitForEnter(10); +// +// // shut down receiver to close the receive loop +// receiveClient.close(); +// executorService.shutdown(); + } + + CompletableFuture sendMessagesAsync(QueueClient sendClient) { + List> data = + GSON.fromJson( + "[" + + "{'name' = 'Einstein', 'firstName' = 'Albert'}," + + "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," + + "{'name' = 'Curie', 'firstName' = 'Marie'}," + + "{'name' = 'Hawking', 'firstName' = 'Steven'}," + + "{'name' = 'Newton', 'firstName' = 'Isaac'}," + + "{'name' = 'Bohr', 'firstName' = 'Niels'}," + + "{'name' = 'Faraday', 'firstName' = 'Michael'}," + + "{'name' = 'Galilei', 'firstName' = 'Galileo'}," + + "{'name' = 'Kepler', 'firstName' = 'Johannes'}," + + "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" + + "]", + new TypeToken>>() {}.getType()); + + List tasks = new ArrayList<>(); + for (int i = 0; i < data.size(); i++) { + final String messageId = Integer.toString(i); + Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8)); + message.setContentType("application/json"); + message.setLabel("Scientist"); + message.setMessageId(messageId); + message.setTimeToLive(Duration.ofMinutes(2)); + System.out.printf("\nMessage sending: Id = %s", message.getMessageId()); + tasks.add( + sendClient.sendAsync(message).thenRunAsync(() -> { + System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId()); + })); + } + return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[tasks.size()])); + } + + void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception { + + + // register the RegisterMessageHandler callback with executor service + queueClient.registerMessageHandler(new IMessageHandler() { + // callback invoked when the message handler loop has obtained a message + public CompletableFuture onMessageAsync(IMessage message) { + // receives message is passed to callback + if (message.getLabel() != null && + message.getContentType() != null && + message.getLabel().contentEquals("Scientist") && + message.getContentType().contentEquals("application/json")) { + + byte[] body = message.getBody(); + Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class); + + System.out.printf( + "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," + + "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n", + message.getMessageId(), + message.getSequenceNumber(), + message.getEnqueuedTimeUtc(), + message.getExpiresAtUtc(), + message.getContentType(), + scientist != null ? scientist.get("firstName") : "", + scientist != null ? scientist.get("name") : ""); + } + return CompletableFuture.completedFuture(null); + } + + // callback invoked when the message handler has an exception to report + public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) { + System.out.printf(exceptionPhase + "-" + throwable.getMessage()); + } + }, + // 1 concurrent call, messages are auto-completed, auto-renew duration + new MessageHandlerOptions(1, true, Duration.ofMinutes(1)), + executorService); + + } + + private void waitForEnter(int seconds) { + ExecutorService executor = Executors.newCachedThreadPool(); + try { + executor.invokeAny(Arrays.asList(() -> { + System.in.read(); + return 0; + }, () -> { + Thread.sleep(seconds * 1000); + return 0; + })); + } catch (Exception e) { + // absorb + } } } From def43718804ec39c32255bfc204a4914e9a88751 Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Tue, 19 May 2020 15:42:28 -0700 Subject: [PATCH 5/8] working test version and modified managementclientasync --- .../management/ManagementClientAsync.java | 40 ------ .../servicebus/ManagementClientProxyTest.java | 125 +----------------- 2 files changed, 2 insertions(+), 163 deletions(-) diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java index 84b42d3062d0..f4ac9de2a755 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java @@ -92,50 +92,10 @@ public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSett DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config() .setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis()) .setUseProxySelector(true) -// .setUseProxyProperties(true) .setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis()); - -// if(shouldUseProxy(this.namespaceEndpointURI.getHost())){ -// InetSocketAddress address = (InetSocketAddress)this.proxies.get(0).address(); -// String proxyHostName=address.getHostName(); -// int proxyPort=address.getPort(); -// -// clientBuilder.setProxyServer(new ProxyServer.Builder(proxyHostName,proxyPort)); -// } - this.asyncHttpClient = asyncHttpClient(clientBuilder); } - public boolean shouldUseProxy(final String hostName) { - final URI uri = createURIFromHostNamePort(hostName, ClientConstants.HTTPS_PORT); - final ProxySelector proxySelector = ProxySelector.getDefault(); - if (proxySelector == null) { - return false; - } - - final List proxies = proxySelector.select(uri); - if (isProxyAddressLegal(proxies)) { - this.proxies = proxies; - return true; - } else { - return false; - } - } - - private static URI createURIFromHostNamePort(final String hostName, final int port) { - return URI.create(String.format(ClientConstants.HTTPS_URI_FORMAT, hostName, port)); - } - - private static boolean isProxyAddressLegal(final List proxies) { - // only checks the first proxy in the list - // returns true if it is an InetSocketAddress, which is required for qpid-proton-j library - return proxies != null - && !proxies.isEmpty() - && proxies.get(0).type() == Proxy.Type.HTTP - && proxies.get(0).address() != null - && proxies.get(0).address() instanceof InetSocketAddress; - } - /** * Retrieves information related to the namespace. * Works with any claim (Send/Listen/Manage). diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java index b34f41dd3b41..23fe334665b0 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java @@ -1,47 +1,23 @@ package com.microsoft.azure.servicebus; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import com.microsoft.azure.servicebus.management.ManagementClient; import com.microsoft.azure.servicebus.management.QueueDescription; import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; -import com.microsoft.azure.servicebus.primitives.ServiceBusException; import com.microsoft.azure.servicebus.primitives.TransportType; import org.junit.Assert; import org.junit.Test; import java.io.IOException; import java.net.*; -import java.time.Duration; import java.util.*; import java.net.InetSocketAddress; import java.net.ProxySelector; import java.net.URI; import java.util.LinkedList; import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static java.nio.charset.StandardCharsets.UTF_8; - public class ManagementClientProxyTest { - static final Gson GSON = new Gson(); - - @Test - public void queueSend() throws Exception { - String connectionString = "Endpoint=sb://anqyan-messaging.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=gqHB0C/IfW/d8uECt5OgHJSAnV7TEDYw2ps6gNURHK0="; - ConnectionStringBuilder connStrBuilder = new ConnectionStringBuilder(connectionString, "javaqueue");//queueName - connStrBuilder.setTransportType(TransportType.AMQP_WEB_SOCKETS); - - QueueClient sendClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK); - Message message = new Message("hello"); - sendClient.sendAsync(message).thenRunAsync(() -> - System.out.printf("Done") - ); - waitForEnter(10); - } - @Test public void managementClientWithProxy() throws Exception { String proxyHostName = "127.0.0.1"; @@ -74,7 +50,7 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe){ ClientSettings managementClientSettings = TestUtils.getManagementClientSettings(); ManagementClient managementClient = new ManagementClient(namespaceEndpointURI, managementClientSettings); - String queueName = "lll" + UUID.randomUUID().toString().substring(0, 8); + String queueName = "test" + UUID.randomUUID().toString().substring(0, 8); QueueDescription q = new QueueDescription(queueName); QueueDescription qCreated = managementClient.createQueue(q); Assert.assertEquals(q, qCreated); @@ -86,105 +62,8 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe){ QueueClient sendClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK); Message message = new Message("hello"); - sendClient.sendAsync(message).thenRunAsync(() -> - //System.out.printf("Done") - sendClient.closeAsync() -); + sendClient.sendAsync(message).thenRunAsync(() -> sendClient.closeAsync()); waitForEnter(10); - -// QueueClient receiveClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK); -// // We are using single thread executor as we are only processing one message at a time -// ExecutorService executorService = Executors.newSingleThreadExecutor(); -// this.registerReceiver(receiveClient, executorService); -// -// // Create a QueueClient instance for sending and then asynchronously send messages. -// // Close the sender once the send operation is complete. -// QueueClient sendClient = new QueueClient(connStrBuilder, ReceiveMode.PEEKLOCK); -// this.sendMessagesAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync()); -//// Message message = new Message(GSON.toJson("Sent Message Via Proxy. ").getBytes(UTF_8)); -//// sendClient.sendAsync(message).thenRunAsync(sendClient::closeAsync); -// -// // wait for ENTER or 10 seconds elapsing -// waitForEnter(10); -// -// // shut down receiver to close the receive loop -// receiveClient.close(); -// executorService.shutdown(); - } - - CompletableFuture sendMessagesAsync(QueueClient sendClient) { - List> data = - GSON.fromJson( - "[" + - "{'name' = 'Einstein', 'firstName' = 'Albert'}," + - "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," + - "{'name' = 'Curie', 'firstName' = 'Marie'}," + - "{'name' = 'Hawking', 'firstName' = 'Steven'}," + - "{'name' = 'Newton', 'firstName' = 'Isaac'}," + - "{'name' = 'Bohr', 'firstName' = 'Niels'}," + - "{'name' = 'Faraday', 'firstName' = 'Michael'}," + - "{'name' = 'Galilei', 'firstName' = 'Galileo'}," + - "{'name' = 'Kepler', 'firstName' = 'Johannes'}," + - "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" + - "]", - new TypeToken>>() {}.getType()); - - List tasks = new ArrayList<>(); - for (int i = 0; i < data.size(); i++) { - final String messageId = Integer.toString(i); - Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8)); - message.setContentType("application/json"); - message.setLabel("Scientist"); - message.setMessageId(messageId); - message.setTimeToLive(Duration.ofMinutes(2)); - System.out.printf("\nMessage sending: Id = %s", message.getMessageId()); - tasks.add( - sendClient.sendAsync(message).thenRunAsync(() -> { - System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId()); - })); - } - return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[tasks.size()])); - } - - void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception { - - - // register the RegisterMessageHandler callback with executor service - queueClient.registerMessageHandler(new IMessageHandler() { - // callback invoked when the message handler loop has obtained a message - public CompletableFuture onMessageAsync(IMessage message) { - // receives message is passed to callback - if (message.getLabel() != null && - message.getContentType() != null && - message.getLabel().contentEquals("Scientist") && - message.getContentType().contentEquals("application/json")) { - - byte[] body = message.getBody(); - Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class); - - System.out.printf( - "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," + - "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n", - message.getMessageId(), - message.getSequenceNumber(), - message.getEnqueuedTimeUtc(), - message.getExpiresAtUtc(), - message.getContentType(), - scientist != null ? scientist.get("firstName") : "", - scientist != null ? scientist.get("name") : ""); - } - return CompletableFuture.completedFuture(null); - } - - // callback invoked when the message handler has an exception to report - public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) { - System.out.printf(exceptionPhase + "-" + throwable.getMessage()); - } - }, - // 1 concurrent call, messages are auto-completed, auto-renew duration - new MessageHandlerOptions(1, true, Duration.ofMinutes(1)), - executorService); - } private void waitForEnter(int seconds) { From 59ef0af035949ba3756b7d0b1782d2d339568baa Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Tue, 19 May 2020 15:52:03 -0700 Subject: [PATCH 6/8] get rid of unnecessary dependencies --- .../microsoft-azure-servicebus/pom.xml | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/sdk/servicebus/microsoft-azure-servicebus/pom.xml b/sdk/servicebus/microsoft-azure-servicebus/pom.xml index fc4933b3a978..249366ae0977 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/pom.xml +++ b/sdk/servicebus/microsoft-azure-servicebus/pom.xml @@ -87,29 +87,5 @@ 4.13-beta-3 test - - com.microsoft.azure - azure-servicebus - 2.0.0-PREVIEW-7 - test - - - com.squareup.retrofit2 - retrofit - 2.6.2 - test - - - com.google.guava - guava - 28.2-jre - test - - - ch.qos.logback - logback-core - 1.1.2 - test - From fca8dfd778cfe00fcbb67f4cba633350f4c9677a Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Tue, 19 May 2020 17:09:17 -0700 Subject: [PATCH 7/8] =?UTF-8?q?add=20setUseProxyProperties(true)=20to=20en?= =?UTF-8?q?able=20setting=20http.Proxy=C2=A0properties?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../azure/servicebus/management/ManagementClientAsync.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java index f4ac9de2a755..baf0c83033fb 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java @@ -92,6 +92,7 @@ public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSett DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config() .setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis()) .setUseProxySelector(true) + .setUseProxyProperties(true) .setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis()); this.asyncHttpClient = asyncHttpClient(clientBuilder); } From 19302d2c1ffcf92a5a8e899aea05b04e29e5af1f Mon Sep 17 00:00:00 2001 From: DorothySun216 <55454966+DorothySun216@users.noreply.github.com> Date: Wed, 20 May 2020 14:04:22 -0700 Subject: [PATCH 8/8] ignore the test --- .../microsoft/azure/servicebus/ManagementClientProxyTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java index 23fe334665b0..9334240ae816 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/test/java/com/microsoft/azure/servicebus/ManagementClientProxyTest.java @@ -5,6 +5,7 @@ import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; import com.microsoft.azure.servicebus.primitives.TransportType; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import java.io.IOException; import java.net.*; @@ -18,6 +19,7 @@ import java.util.concurrent.Executors; public class ManagementClientProxyTest { + @Ignore @Test public void managementClientWithProxy() throws Exception { String proxyHostName = "127.0.0.1";