Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/servicebus/microsoft-azure-servicebus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,11 @@
<version>4.13-beta-3</version> <!-- {x-version-update;junit:junit;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.8</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, our versioning story is managed by a series of scripts to keep dependency versions aligned. Can you add this annotation and follow the instructions to update to the proper version?
https://github.com/Azure/azure-sdk-for-java/blob/master/CONTRIBUTING.md#versions-and-versioning

<!-- {x-version-update;org.apache.httpcomponents:httpclient;external_dependency} -->

<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,15 @@

package com.microsoft.azure.servicebus.management;

import com.microsoft.azure.servicebus.ClientSettings;
import com.microsoft.azure.servicebus.Utils;
import com.microsoft.azure.servicebus.primitives.AuthorizationFailedException;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsException;
import com.microsoft.azure.servicebus.primitives.MessagingEntityNotFoundException;
import com.microsoft.azure.servicebus.primitives.QuotaExceededException;
import com.microsoft.azure.servicebus.primitives.ServerBusyException;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.primitives.TimeoutException;
import com.microsoft.azure.servicebus.primitives.Util;
import com.microsoft.azure.servicebus.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Star imports are not allowed. You may have to update IntelliJ to not change it to a star import.

import com.microsoft.azure.servicebus.primitives.*;
import com.microsoft.azure.servicebus.rules.RuleDescription;

import java.io.IOException;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Synchronous client to perform management operations on Service Bus entities.
Expand All @@ -27,14 +20,18 @@
public class ManagementClient {
private ManagementClientAsync asyncClient;

public ManagementClient(ConnectionStringBuilder connectionStringBuilder) {
public ManagementClient(ConnectionStringBuilder connectionStringBuilder) throws InterruptedException, ServiceBusException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an existing public-facing API, this will break existing customers. You could catch the checked exceptions and throw an unchecked one to main compat.

this(connectionStringBuilder.getEndpoint(), Util.getClientSettingsFromConnectionStringBuilder(connectionStringBuilder));
}

public ManagementClient(URI namespaceEndpointURI, ClientSettings clientSettings) {
public ManagementClient(URI namespaceEndpointURI, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
this.asyncClient = new ManagementClientAsync(namespaceEndpointURI, clientSettings);
}

public ManagementClient(URI namespaceEndpointURI, ClientSettings clientSettings, DefaultAsyncHttpClientConfig.Builder httpClientBuilder) {
this.asyncClient = new ManagementClientAsync(namespaceEndpointURI, clientSettings, httpClientBuilder);
}

/**
* Retrieves information related to the namespace.
* Works with any claim (Send/Listen/Manage).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,8 @@

package com.microsoft.azure.servicebus.management;

import com.microsoft.azure.servicebus.ClientSettings;
import com.microsoft.azure.servicebus.primitives.AuthorizationFailedException;
import com.microsoft.azure.servicebus.primitives.ClientConstants;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsException;
import com.microsoft.azure.servicebus.primitives.MessagingEntityNotFoundException;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.QuotaExceededException;
import com.microsoft.azure.servicebus.primitives.ServerBusyException;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.primitives.Util;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.*;
import com.microsoft.azure.servicebus.rules.RuleDescription;
import com.microsoft.azure.servicebus.security.SecurityToken;
import com.microsoft.azure.servicebus.security.TokenProvider;
Expand All @@ -24,6 +15,7 @@
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.Response;
import org.asynchttpclient.proxy.ProxyServer;
import org.asynchttpclient.util.HttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,10 +30,7 @@
import javax.xml.parsers.ParserConfigurationException;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove star imports.

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
Expand All @@ -68,15 +57,18 @@ public class ManagementClientAsync {
private static final String USER_AGENT = String.format("%s/%s(%s)", ClientConstants.PRODUCT_NAME, ClientConstants.CURRENT_JAVACLIENT_VERSION, ClientConstants.PLATFORM_INFO);

private ClientSettings clientSettings;
private MessagingFactory factory;
private URI namespaceEndpointURI;
private AsyncHttpClient asyncHttpClient;
private MiscRequestResponseOperationHandler miscRequestResponseHandler;
private List<Proxy> proxies;

/**
* Creates a new {@link ManagementClientAsync}.
* User should call {@link ManagementClientAsync#close()} at the end of life of the client.
* @param connectionStringBuilder - connectionStringBuilder containing namespace information and client settings.
*/
public ManagementClientAsync(ConnectionStringBuilder connectionStringBuilder) {
public ManagementClientAsync(ConnectionStringBuilder connectionStringBuilder) throws InterruptedException, ServiceBusException {
this(connectionStringBuilder.getEndpoint(), Util.getClientSettingsFromConnectionStringBuilder(connectionStringBuilder));
}

Expand All @@ -86,14 +78,93 @@ public ManagementClientAsync(ConnectionStringBuilder connectionStringBuilder) {
* @param namespaceEndpointURI - URI of the namespace connecting to.
* @param clientSettings - client settings.
*/
public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSettings) {
// public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSettings) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove dead code.

// this.namespaceEndpointURI = namespaceEndpointURI;
// this.clientSettings = clientSettings;
// DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config()
// .setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis())
// .setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis());
// this.asyncHttpClient = asyncHttpClient(clientBuilder);
// }

public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSettings, DefaultAsyncHttpClientConfig.Builder httpClientBuilder) {
this.namespaceEndpointURI = namespaceEndpointURI;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider any null checks to avoid inadvertent NPEs? (ie. Objects.requireNotNull)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the other public constructors.

this.clientSettings = clientSettings;
this.asyncHttpClient = asyncHttpClient(httpClientBuilder);
}

public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
this.namespaceEndpointURI = namespaceEndpointURI;
this.clientSettings = clientSettings;
DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config()
.setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis())
.setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis());
.setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis())
.setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis());

if(shouldUseProxy(this.namespaceEndpointURI.getHost())){
InetSocketAddress address = (InetSocketAddress)this.proxies.get(0).address();
String proxyHostName = address.getHostName();
int proxyPort = address.getPort();
clientBuilder.setProxyServer(new ProxyServer.Builder(proxyHostName, proxyPort));
}

this.asyncHttpClient = asyncHttpClient(clientBuilder);
// CompletableFuture<MessagingFactory> factoryFuture = MessagingFactory.createFromNamespaceEndpointURIAsync(namespaceEndpointURI, clientSettings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove all commented out code. It'll keep the code base clean.

// Utils.completeFuture(factoryFuture.thenComposeAsync((f) -> this.createInternals(f), MessagingFactory.INTERNAL_THREAD_POOL));
// if (TRACE_LOGGER.isInfoEnabled()) {
// TRACE_LOGGER.info("Created management client '{}'", namespaceEndpointURI.toString());
// }
}

public boolean shouldUseProxy(final String hostName) {
final URI uri = createURIFromHostNamePort(hostName, ClientConstants.HTTPS_PORT);
final ProxySelector proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
return false;
}

final List<Proxy> proxies = proxySelector.select(uri);
if (isProxyAddressLegal(proxies)) {
this.proxies = proxies;
return true;
} else {
return false;
}
}

private static URI createURIFromHostNamePort(final String hostName, final int port) {
return URI.create(String.format(ClientConstants.HTTPS_URI_FORMAT, hostName, port));
}

private static boolean isProxyAddressLegal(final List<Proxy> proxies) {
// only checks the first proxy in the list
// returns true if it is an InetSocketAddress, which is required for qpid-proton-j library
return proxies != null
&& !proxies.isEmpty()
&& proxies.get(0).type() == Proxy.Type.HTTP
&& proxies.get(0).address() != null
&& proxies.get(0).address() instanceof InetSocketAddress;
}
private CompletableFuture<Void> createInternals(MessagingFactory factory) {
this.factory = factory;

CompletableFuture<Void> postSessionBrowserFuture = MiscRequestResponseOperationHandler.create(factory).thenAcceptAsync((msoh) -> {
this.miscRequestResponseHandler = msoh;
//this.sessionBrowser = new SessionBrowser(factory, queuePath, MessagingEntityType.QUEUE, msoh);
}, MessagingFactory.INTERNAL_THREAD_POOL);

return CompletableFuture.allOf(postSessionBrowserFuture);
}

// No op now
// @Override
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove ded code

// CompletableFuture<Void> initializeAsync() {
// return CompletableFuture.completedFuture(null);
// }
//
// @Override
// protected CompletableFuture<Void> onClose() {
// return this.miscRequestResponseHandler.closeAsync().thenCompose((w) -> this.factory.closeAsync());
// }

/**
* Retrieves information related to the namespace.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

public final class MiscRequestResponseOperationHandler extends ClientEntity {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MiscRequestResponseOperationHandler.class);

private final Object requestResonseLinkCreationLock = new Object();
private final String entityPath;
private final MessagingEntityType entityType;
Expand All @@ -37,6 +37,10 @@ private MiscRequestResponseOperationHandler(MessagingFactory factory, String lin
}

@Deprecated
public static CompletableFuture<MiscRequestResponseOperationHandler> create(MessagingFactory factory) {
return create(factory, null, null);
}

public static CompletableFuture<MiscRequestResponseOperationHandler> create(MessagingFactory factory, String entityPath) {
return create(factory, entityPath, null);
}
Expand Down Expand Up @@ -76,7 +80,7 @@ private CompletableFuture<Void> createRequestResponseLink() {
return null;
}, MessagingFactory.INTERNAL_THREAD_POOL);
}

return this.requestResponseLinkCreationFuture;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ public abstract class ClientSessionTests extends Tests {
private String receiveEntityPath;
private IMessageSender sendClient;
private IMessageAndSessionPump receiveClient;

@BeforeClass
public static void init() {
public static void init() throws ServiceBusException, InterruptedException {
ClientSessionTests.entityNameCreatedForAllTests = null;
ClientSessionTests.receiveEntityPathForAllTest = null;
URI namespaceEndpointURI = TestUtils.getNamespaceEndpointURI();
ClientSettings managementClientSettings = TestUtils.getManagementClientSettings();
managementClient = new ManagementClientAsync(namespaceEndpointURI, managementClientSettings);
}

@Before
public void setup() throws InterruptedException, ExecutionException {
if (this.shouldCreateEntityForEveryTest() || ClientSessionTests.entityNameCreatedForAllTests == null) {
Expand Down Expand Up @@ -72,7 +72,7 @@ public void setup() throws InterruptedException, ExecutionException {
this.receiveEntityPath = ClientSessionTests.receiveEntityPathForAllTest;
}
}

@After
public void tearDown() throws ServiceBusException, InterruptedException, ExecutionException {
if (this.sendClient != null) {
Expand All @@ -85,14 +85,14 @@ public void tearDown() throws ServiceBusException, InterruptedException, Executi
((QueueClient) this.receiveClient).close();
}
}

if (this.shouldCreateEntityForEveryTest()) {
managementClient.deleteQueueAsync(this.entityName).get();
} else {
TestCommons.drainAllSessions(this.receiveEntityPath, this.isEntityQueue());
}
}

@AfterClass
public static void cleanupAfterAllTest() throws ExecutionException, InterruptedException, IOException {
if (managementClient != null) {
Expand All @@ -103,7 +103,7 @@ public static void cleanupAfterAllTest() throws ExecutionException, InterruptedE
managementClient.close();
}
}

private void createClients(ReceiveMode receiveMode) throws InterruptedException, ServiceBusException {
if (this.isEntityQueue()) {
this.sendClient = new QueueClient(TestUtils.getNamespaceEndpointURI(), this.entityName, TestUtils.getClientSettings(), receiveMode);
Expand All @@ -113,49 +113,49 @@ private void createClients(ReceiveMode receiveMode) throws InterruptedException,
this.receiveClient = new SubscriptionClient(TestUtils.getNamespaceEndpointURI(), this.receiveEntityPath, TestUtils.getClientSettings(), receiveMode);
}
}

@Test
public void testRegisterAnotherHandlerAfterSessionHandler() throws InterruptedException, ServiceBusException {
this.createClients(ReceiveMode.PEEKLOCK);
MessageAndSessionPumpTests.testRegisterAnotherHandlerAfterSessionHandler(this.receiveClient);
}

@Test
public void testGetMessageSessions() throws InterruptedException, ServiceBusException {
this.createClients(ReceiveMode.PEEKLOCK);
TestCommons.testGetMessageSessions(this.sendClient, this.receiveClient);
}

@Test
public void testSessionPumpAutoCompleteWithOneConcurrentCallPerSession() throws InterruptedException, ServiceBusException {
this.createClients(ReceiveMode.PEEKLOCK);
MessageAndSessionPumpTests.testSessionPumpAutoCompleteWithOneConcurrentCallPerSession(this.sendClient, this.receiveClient);
}

@Test
public void testReceiveAndDeleteSessionPump() throws InterruptedException, ServiceBusException {
this.createClients(ReceiveMode.RECEIVEANDDELETE);
MessageAndSessionPumpTests.testSessionPumpAutoCompleteWithOneConcurrentCallPerSession(this.sendClient, this.receiveClient);
}

@Test
public void testSessionPumpAutoCompleteWithMultipleConcurrentCallsPerSession() throws InterruptedException, ServiceBusException {
this.createClients(ReceiveMode.PEEKLOCK);
MessageAndSessionPumpTests.testSessionPumpAutoCompleteWithMultipleConcurrentCallsPerSession(this.sendClient, this.receiveClient);
}

@Test
public void testSessionPumpClientComplete() throws InterruptedException, ServiceBusException {
this.createClients(ReceiveMode.PEEKLOCK);
MessageAndSessionPumpTests.testSessionPumpClientComplete(this.sendClient, this.receiveClient);
}

@Test
public void testSessionPumpAbandonOnException() throws InterruptedException, ServiceBusException {
this.createClients(ReceiveMode.PEEKLOCK);
MessageAndSessionPumpTests.testSessionPumpAbandonOnException(this.sendClient, this.receiveClient);
}

@Test
public void testSessionPumpRenewLock() throws InterruptedException, ServiceBusException {
this.createClients(ReceiveMode.PEEKLOCK);
Expand Down
Loading