diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/WebSocketConnectionHandler.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/WebSocketConnectionHandler.java index 22322e8f8b6a..fd9445d78757 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/WebSocketConnectionHandler.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/WebSocketConnectionHandler.java @@ -14,14 +14,12 @@ public class WebSocketConnectionHandler extends ConnectionHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class); - public WebSocketConnectionHandler(IAmqpConnection messagingFactory) - { + public WebSocketConnectionHandler(IAmqpConnection messagingFactory) { super(messagingFactory); } @Override - public void addTransportLayers(final Event event, final TransportInternal transport) - { + public void addTransportLayers(final Event event, final TransportInternal transport) { final String hostName = event.getConnection().getHostname(); final WebSocketImpl webSocket = new WebSocketImpl(); @@ -37,21 +35,19 @@ public void addTransportLayers(final Event event, final TransportInternal transp transport.addTransportLayer(webSocket); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName +"]"); + TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName + "]"); } super.addTransportLayers(event, transport); } @Override - public int getProtocolPort() - { + public int getProtocolPort() { return ClientConstants.HTTPS_PORT; } @Override - public int getMaxFrameSize() - { + public int getMaxFrameSize() { // This is the current limitation of https://github.com/Azure/qpid-proton-j-extensions // once, this library enables larger frames - this property can be removed. return AmqpConstants.WEBSOCKET_MAX_FRAME_SIZE; diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/EntityNameHelper.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/EntityNameHelper.java index 66293b7bbfb1..892ebe1a9118 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/EntityNameHelper.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/EntityNameHelper.java @@ -7,14 +7,14 @@ * This class can be used to format the path for different Service Bus entity types. */ public class EntityNameHelper { - private static final String pathDelimiter = "/"; - private static final String subscriptionsSubPath = "Subscriptions"; - private static final String rulesSubPath = "Rules"; - private static final String subQueuePrefix = "$"; - private static final String deadLetterQueueSuffix = "DeadLetterQueue"; - private static final String deadLetterQueueName = subQueuePrefix + deadLetterQueueSuffix; - private static final String transfer = "Transfer"; - private static final String transferDeadLetterQueueName = subQueuePrefix + transfer + pathDelimiter + deadLetterQueueName; + private static final String PATH_DELIMITER = "/"; + private static final String SUBSCRIPTIONS_SUB_PATH = "Subscriptions"; + private static final String RULES_SUB_PATH = "Rules"; + private static final String SUB_QUEUE_PREFIX = "$"; + private static final String DEAD_LETTER_QUEUE_SUFFIX = "DeadLetterQueue"; + private static final String DEAD_LETTER_QUEUE_NAME = SUB_QUEUE_PREFIX + DEAD_LETTER_QUEUE_SUFFIX; + private static final String TRANSFER = "Transfer"; + private static final String TRANSFER_DEAD_LETTER_QUEUE_NAME = SUB_QUEUE_PREFIX + TRANSFER + PATH_DELIMITER + DEAD_LETTER_QUEUE_NAME; /** * Formats the dead letter path for either a queue, or a subscription. @@ -22,7 +22,7 @@ public class EntityNameHelper { * @return - The path as a String of the dead letter entity. */ public static String formatDeadLetterPath(String entityPath) { - return formatSubQueuePath(entityPath, deadLetterQueueName); + return formatSubQueuePath(entityPath, DEAD_LETTER_QUEUE_NAME); } /** @@ -32,7 +32,7 @@ public static String formatDeadLetterPath(String entityPath) { * @return The path of the subscription. */ public static String formatSubscriptionPath(String topicPath, String subscriptionName) { - return String.join(pathDelimiter, topicPath, subscriptionsSubPath, subscriptionName); + return String.join(PATH_DELIMITER, topicPath, SUBSCRIPTIONS_SUB_PATH, subscriptionName); } /** @@ -43,11 +43,11 @@ public static String formatSubscriptionPath(String topicPath, String subscriptio * @return The path of the rule */ public static String formatRulePath(String topicPath, String subscriptionName, String ruleName) { - return String.join(pathDelimiter, + return String.join(PATH_DELIMITER, topicPath, - subscriptionsSubPath, + SUBSCRIPTIONS_SUB_PATH, subscriptionName, - rulesSubPath, + RULES_SUB_PATH, ruleName); } @@ -57,11 +57,11 @@ public static String formatRulePath(String topicPath, String subscriptionName, S * @return The path of the transfer dead letter sub-queue for the entity */ public static String formatTransferDeadLetterPath(String entityPath) { - return String.join(pathDelimiter, entityPath, transferDeadLetterQueueName); + return String.join(PATH_DELIMITER, entityPath, TRANSFER_DEAD_LETTER_QUEUE_NAME); } static String formatSubQueuePath(String entityPath, String subQueueName) { - return entityPath + EntityNameHelper.pathDelimiter + subQueueName; + return entityPath + EntityNameHelper.PATH_DELIMITER + subQueueName; } static void checkValidQueueName(String queueName) { @@ -90,18 +90,18 @@ private static void checkValidEntityName(String entityName, int maxEntityNameLen throw new IllegalArgumentException("Entity path " + entityName + " exceeds the " + maxEntityNameLength + " character limit"); } - if (tempName.startsWith(pathDelimiter) || tempName.endsWith(pathDelimiter)) { + if (tempName.startsWith(PATH_DELIMITER) || tempName.endsWith(PATH_DELIMITER)) { throw new IllegalArgumentException("The entity name cannot contain '/' as prefix or suffix."); } - if (!allowSeparator && tempName.contains(pathDelimiter)) { - throw new IllegalArgumentException("The entity name contains an invalid character '" + pathDelimiter + "'"); + if (!allowSeparator && tempName.contains(PATH_DELIMITER)) { + throw new IllegalArgumentException("The entity name contains an invalid character '" + PATH_DELIMITER + "'"); } for (char key : ManagementClientConstants.InvalidEntityPathCharacters) { if (entityName.indexOf(key) >= 0) { - throw new IllegalArgumentException(entityName + " contains character '" + key + "' which is not allowed" + - "because it is reserved in the Uri scheme."); + throw new IllegalArgumentException(entityName + " contains character '" + key + "' which is not allowed" + + "because it is reserved in the Uri scheme."); } } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClient.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClient.java index 6e9dcb068eb8..bd487267458b 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClient.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClient.java @@ -5,7 +5,9 @@ import com.microsoft.azure.servicebus.ClientSettings; import com.microsoft.azure.servicebus.Utils; -import com.microsoft.azure.servicebus.primitives.*; +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import com.microsoft.azure.servicebus.primitives.Util; import com.microsoft.azure.servicebus.rules.RuleDescription; import java.io.IOException; diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java index c0f7681dde80..b5ed043930d5 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/ManagementClientAsync.java @@ -4,15 +4,33 @@ package com.microsoft.azure.servicebus.management; import com.microsoft.azure.servicebus.ClientSettings; -import com.microsoft.azure.servicebus.primitives.*; +import com.microsoft.azure.servicebus.primitives.AuthorizationFailedException; +import com.microsoft.azure.servicebus.primitives.ClientConstants; +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsException; +import com.microsoft.azure.servicebus.primitives.MessagingEntityNotFoundException; +import com.microsoft.azure.servicebus.primitives.MessagingFactory; +import com.microsoft.azure.servicebus.primitives.QuotaExceededException; +import com.microsoft.azure.servicebus.primitives.ServerBusyException; +import com.microsoft.azure.servicebus.primitives.ServiceBusException; +import com.microsoft.azure.servicebus.primitives.Util; import com.microsoft.azure.servicebus.rules.RuleDescription; import com.microsoft.azure.servicebus.security.SecurityToken; import com.microsoft.azure.servicebus.security.TokenProvider; -import org.asynchttpclient.*; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.Dsl; +import org.asynchttpclient.ListenableFuture; +import org.asynchttpclient.Request; +import org.asynchttpclient.RequestBuilder; +import org.asynchttpclient.Response; import org.asynchttpclient.util.HttpConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.w3c.dom.*; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; import org.xml.sax.SAXException; import javax.xml.parsers.DocumentBuilder; @@ -72,8 +90,8 @@ public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSett this.namespaceEndpointURI = namespaceEndpointURI; this.clientSettings = clientSettings; DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config() - .setConnectTimeout((int)CONNECTION_TIMEOUT.toMillis()) - .setRequestTimeout((int)this.clientSettings.getOperationTimeout().toMillis()); + .setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis()) + .setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis()); this.asyncHttpClient = asyncHttpClient(clientBuilder); } @@ -977,7 +995,7 @@ private CompletableFuture deleteEntityAsync(String path) { return exceptionFuture; } - return sendManagementHttpRequestAsync(HttpConstants.Methods.DELETE, entityURL, null, null).thenAccept(c -> {}); + return sendManagementHttpRequestAsync(HttpConstants.Methods.DELETE, entityURL, null, null).thenAccept(c -> { }); } /** @@ -990,7 +1008,7 @@ public void close() throws IOException { private static URL getManagementURL(URI namespaceEndpontURI, String entityPath, String query) throws ServiceBusException { try { - URI httpURI = new URI("https", null, namespaceEndpontURI.getHost(), getPortNumberFromHost(namespaceEndpontURI.getHost()), "/"+entityPath, query, null); + URI httpURI = new URI("https", null, namespaceEndpontURI.getHost(), getPortNumberFromHost(namespaceEndpontURI.getHost()), "/" + entityPath, query, null); return httpURI.toURL(); } catch (URISyntaxException | MalformedURLException e) { throw new ServiceBusException(false, e); @@ -1027,8 +1045,7 @@ private CompletableFuture sendManagementHttpRequestAsync(String httpMeth CompletableFuture outputFuture = new CompletableFuture<>(); listenableFuture.toCompletableFuture() - .handleAsync((response, ex) -> - { + .handleAsync((response, ex) -> { if (ex != null) { outputFuture.completeExceptionally(ex); } else { @@ -1057,8 +1074,7 @@ private static void validateHttpResponse(Request request, Response response) thr } ServiceBusException exception = null; - switch (response.getStatusCode()) - { + switch (response.getStatusCode()) { case 401: /*UnAuthorized*/ exception = new AuthorizationFailedException(exceptionMessage); break; @@ -1092,8 +1108,7 @@ private static void validateHttpResponse(Request request, Response response) thr if (exceptionMessage.contains(ManagementClientConstants.ForbiddenInvalidOperationSubCode)) { //todo: log throw new UnsupportedOperationException(exceptionMessage); - } - else { + } else { exception = new QuotaExceededException(exceptionMessage); } break; @@ -1145,18 +1160,15 @@ private static String parseDetailIfAvailable(String content) { return null; } - private static String getSecurityToken(TokenProvider tokenProvider, String url ) throws InterruptedException, ExecutionException { + private static String getSecurityToken(TokenProvider tokenProvider, String url) throws InterruptedException, ExecutionException { SecurityToken token = tokenProvider.getSecurityTokenAsync(url).get(); return token.getTokenValue(); } private static int getPortNumberFromHost(String host) { - if(host.endsWith("onebox.windows-int.net")) - { + if (host.endsWith("onebox.windows-int.net")) { return ONE_BOX_HTTPS_PORT; - } - else - { + } else { return -1; } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/QueueRuntimeInfo.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/QueueRuntimeInfo.java index 56fd34ec18b1..13977722f195 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/QueueRuntimeInfo.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/QueueRuntimeInfo.java @@ -11,8 +11,7 @@ public class QueueRuntimeInfo extends EntityRuntimeInfo { private long messageCount; private long sizeInBytes; - QueueRuntimeInfo(String path) - { + QueueRuntimeInfo(String path) { this.path = path; } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SerializerUtil.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SerializerUtil.java index d7df43cbbaba..6f92edbc93b6 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SerializerUtil.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SerializerUtil.java @@ -8,40 +8,33 @@ // Fields that require special serializations public class SerializerUtil { - public static String serializeDuration(Duration duration) - { - if(duration == null || duration.isNegative() || duration.isZero()) - { + public static String serializeDuration(Duration duration) { + if (duration == null || duration.isNegative() || duration.isZero()) { return ""; } Duration remainingTime = duration; StringBuffer sb = new StringBuffer("P"); long days = remainingTime.toDays(); - if(days > 0) - { + if (days > 0) { sb.append(days).append("D"); remainingTime = duration.minusDays(days); } - if(!remainingTime.isZero()) - { + if (!remainingTime.isZero()) { sb.append("T"); long hours = remainingTime.toHours(); - if(hours > 0) - { + if (hours > 0) { sb.append(hours).append("H"); remainingTime = duration.minusHours(hours); } long minutes = remainingTime.toMinutes(); - if(minutes > 0) - { + if (minutes > 0) { sb.append(minutes).append("M"); remainingTime = duration.minusMinutes(minutes); } long seconds = remainingTime.getSeconds(); - if(seconds > 0) - { + if (seconds > 0) { sb.append(seconds).append("S"); } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SharedAccessAuthorizationRule.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SharedAccessAuthorizationRule.java index 5498b99c808c..f24d8ba20dc3 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SharedAccessAuthorizationRule.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SharedAccessAuthorizationRule.java @@ -6,11 +6,13 @@ import com.microsoft.azure.servicebus.security.SecurityConstants; import java.security.SecureRandom; -import java.util.*; +import java.util.Base64; +import java.util.HashSet; +import java.util.List; public class SharedAccessAuthorizationRule extends AuthorizationRule { - static int SUPPORTED_SAS_KEY_LENGTH = 44; - static String FIXED_CLAIM_TYPE = "SharedAccessKey"; + static final int SUPPORTED_SAS_KEY_LENGTH = 44; + static final String FIXED_CLAIM_TYPE = "SharedAccessKey"; private String keyName; private String primaryKey; @@ -37,7 +39,7 @@ public SharedAccessAuthorizationRule(String keyName, String primaryKey, String s @Override public String getClaimType() { - return SharedAccessAuthorizationRule.FIXED_CLAIM_TYPE; + return SharedAccessAuthorizationRule.FIXED_CLAIM_TYPE ; } @Override @@ -135,8 +137,8 @@ public boolean equals(Object o) { return false; } - if ((this.rights != null && other.rights == null) || - this.rights == null && other.rights != null) { + if ((this.rights != null && other.rights == null) + || this.rights == null && other.rights != null) { return false; } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescription.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescription.java index 708fc4918f93..a26e5615c33a 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescription.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescription.java @@ -36,8 +36,7 @@ public class SubscriptionDescription { * @param subscriptionName - Name of the subscription * Max length is 50 chars. Cannot have restricted characters: '@','?','#','*','/' */ - public SubscriptionDescription(String topicPath, String subscriptionName) - { + public SubscriptionDescription(String topicPath, String subscriptionName) { this.setTopicPath(topicPath); this.setSubscriptionName(subscriptionName); } @@ -87,8 +86,7 @@ public String getPath() { * so that no other receiver receives the same message. * @return The duration of a peek lock. Default value is 60 seconds. */ - public Duration getLockDuration() - { + public Duration getLockDuration() { return this.lockDuration; } @@ -97,8 +95,7 @@ public Duration getLockDuration() * so that no other receiver receives the same message. * @param lockDuration - The duration of a peek lock. Max value is 5 minutes. */ - public void setLockDuration(Duration lockDuration) - { + public void setLockDuration(Duration lockDuration) { this.lockDuration = lockDuration; if (this.lockDuration.compareTo(ManagementClientConstants.MAX_DURATION) > 0) { this.lockDuration = ManagementClientConstants.MAX_DURATION; @@ -137,10 +134,9 @@ public Duration getDefaultMessageTimeToLive() { * See {@link #getDefaultMessageTimeToLive()} */ public void setDefaultMessageTimeToLive(Duration defaultMessageTimeToLive) { - if (defaultMessageTimeToLive != null && - (defaultMessageTimeToLive.compareTo(ManagementClientConstants.MIN_ALLOWED_TTL) < 0 || - defaultMessageTimeToLive.compareTo(ManagementClientConstants.MAX_ALLOWED_TTL) > 0)) - { + if (defaultMessageTimeToLive != null + && (defaultMessageTimeToLive.compareTo(ManagementClientConstants.MIN_ALLOWED_TTL) < 0 + || defaultMessageTimeToLive.compareTo(ManagementClientConstants.MAX_ALLOWED_TTL) > 0)) { throw new IllegalArgumentException( String.format("The value must be between %s and %s.", ManagementClientConstants.MAX_ALLOWED_TTL, @@ -163,9 +159,8 @@ public Duration getAutoDeleteOnIdle() { * The minimum duration is 5 minutes. */ public void setAutoDeleteOnIdle(Duration autoDeleteOnIdle) { - if (autoDeleteOnIdle != null && - autoDeleteOnIdle.compareTo(ManagementClientConstants.MIN_ALLOWED_AUTODELETE_DURATION) < 0) - { + if (autoDeleteOnIdle != null + && autoDeleteOnIdle.compareTo(ManagementClientConstants.MIN_ALLOWED_AUTODELETE_DURATION) < 0) { throw new IllegalArgumentException( String.format("The value must be greater than %s.", ManagementClientConstants.MIN_ALLOWED_AUTODELETE_DURATION)); @@ -227,8 +222,7 @@ public int getMaxDeliveryCount() { * @param maxDeliveryCount - Minimum value is 1. */ public void setMaxDeliveryCount(int maxDeliveryCount) { - if (maxDeliveryCount < ManagementClientConstants.MIN_ALLOWED_MAX_DELIVERYCOUNT) - { + if (maxDeliveryCount < ManagementClientConstants.MIN_ALLOWED_MAX_DELIVERYCOUNT) { throw new IllegalArgumentException( String.format("The value must be greater than %s.", ManagementClientConstants.MIN_ALLOWED_MAX_DELIVERYCOUNT)); diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/AuthorizationFailedException.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/AuthorizationFailedException.java index 71e86ca75af7..922dc6e0fbdf 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/AuthorizationFailedException.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/AuthorizationFailedException.java @@ -10,12 +10,10 @@ * @see http://go.microsoft.com/fwlink/?LinkId=761101 * @since 1.0 */ -public class AuthorizationFailedException extends ServiceBusException -{ +public class AuthorizationFailedException extends ServiceBusException { private static final long serialVersionUID = 5384872132102860710L; - AuthorizationFailedException() - { + AuthorizationFailedException() { super(false); } @@ -23,18 +21,15 @@ public class AuthorizationFailedException extends ServiceBusException * Constructor for the exception class * @param message the actual error message detailing the reason for the failure */ - public AuthorizationFailedException(final String message) - { + public AuthorizationFailedException(final String message) { super(false, message); } - AuthorizationFailedException(final Throwable cause) - { + AuthorizationFailedException(final Throwable cause) { super(false, cause); } - AuthorizationFailedException(final String message, final Throwable cause) - { + AuthorizationFailedException(final String message, final Throwable cause) { super(false, message, cause); } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java index 63ebe375ab5e..f9fa63e275ce 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java @@ -3,28 +3,26 @@ package com.microsoft.azure.servicebus.primitives; -import java.io.IOException; -import java.time.*; +import java.time.Duration; import java.util.Properties; import java.util.UUID; -import org.apache.qpid.proton.amqp.*; - import com.microsoft.azure.servicebus.amqp.AmqpConstants; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class ClientConstants -{ - final static String END_POINT_FORMAT = "amqps://%s.servicebus.windows.net"; +public final class ClientConstants { + static final String END_POINT_FORMAT = "amqps://%s.servicebus.windows.net"; private ClientConstants() { } private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ClientConstants.class); public static final String FATAL_MARKER = "FATAL"; - public final static String PRODUCT_NAME = "MSJavaClient"; - public final static String CURRENT_JAVACLIENT_VERSION = getClientVersion(); + public static final String PRODUCT_NAME = "MSJavaClient"; + public static final String CURRENT_JAVACLIENT_VERSION = getClientVersion(); public static final String PLATFORM_INFO = getPlatformInfo(); public static final int DEFAULT_OPERATION_TIMEOUT_IN_SECONDS = 30; @@ -38,23 +36,23 @@ private ClientConstants() { } public static final String PARTITIONKEYNAME = "x-opt-partition-key"; public static final String VIAPARTITIONKEYNAME = "x-opt-via-partition-key"; public static final String DEADLETTERSOURCENAME = "x-opt-deadletter-source"; - public static final UUID ZEROLOCKTOKEN = new UUID(0l, 0l); - - public final static int AMQPS_PORT = 5671; - public final static int HTTPS_PORT = 443; - public final static int MAX_PARTITION_KEY_LENGTH = 128; - - public final static Symbol SERVER_BUSY_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":server-busy"); - public final static Symbol ARGUMENT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":argument-error"); - public final static Symbol ARGUMENT_OUT_OF_RANGE_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":argument-out-of-range"); - public final static Symbol ENTITY_DISABLED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-disabled"); - public final static Symbol PARTITION_NOT_OWNED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":partition-not-owned"); - public final static Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost"); - public final static Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); - public final static Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); - public final static Symbol LINK_TRANSFER_DESTINATION_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":transfer-destination-address"); - public final static Symbol LINK_PEEKMODE_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":peek-mode"); - public final static Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id"); + public static final UUID ZEROLOCKTOKEN = new UUID(0L, 0L); + + public static final int AMQPS_PORT = 5671; + public static final int HTTPS_PORT = 443; + public static final int MAX_PARTITION_KEY_LENGTH = 128; + + public static final Symbol SERVER_BUSY_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":server-busy"); + public static final Symbol ARGUMENT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":argument-error"); + public static final Symbol ARGUMENT_OUT_OF_RANGE_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":argument-out-of-range"); + public static final Symbol ENTITY_DISABLED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-disabled"); + public static final Symbol PARTITION_NOT_OWNED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":partition-not-owned"); + public static final Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost"); + public static final Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); + public static final Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); + public static final Symbol LINK_TRANSFER_DESTINATION_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":transfer-destination-address"); + public static final Symbol LINK_PEEKMODE_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":peek-mode"); + public static final Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id"); public static final Symbol DEADLETTERNAME = Symbol.valueOf(AmqpConstants.VENDOR + ":dead-letter"); public static final Symbol MESSAGE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":message-lock-lost"); public static final Symbol SESSION_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":session-lock-lost"); @@ -64,7 +62,7 @@ private ClientConstants() { } public static final Symbol ENTITY_ALREADY_EXISTS_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-already-exists"); public static final Symbol SESSION_FILTER = Symbol.getSymbol(AmqpConstants.VENDOR + ":session-filter"); public static final Symbol LOCKED_UNTIL_UTC = Symbol.getSymbol(AmqpConstants.VENDOR + ":locked-until-utc"); - public final static Symbol ENTITY_TYPE_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-type"); + public static final Symbol ENTITY_TYPE_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":entity-type"); public static final String DEADLETTER_REASON_HEADER = "DeadLetterReason"; public static final String DEADLETTER_ERROR_DESCRIPTION_HEADER = "DeadLetterErrorDescription"; @@ -73,20 +71,20 @@ private ClientConstants() { } public static final int MAX_FRAME_SIZE_BYTES = 64 * 1024; public static final int MAX_MESSAGING_AMQP_HEADER_SIZE_BYTES = 512; - public final static Duration TIMER_TOLERANCE = Duration.ofSeconds(1); + public static final Duration TIMER_TOLERANCE = Duration.ofSeconds(1); - public final static Duration DEFAULT_RERTRY_MIN_BACKOFF = Duration.ofSeconds(0); - public final static Duration DEFAULT_RERTRY_MAX_BACKOFF = Duration.ofSeconds(30); + public static final Duration DEFAULT_RERTRY_MIN_BACKOFF = Duration.ofSeconds(0); + public static final Duration DEFAULT_RERTRY_MAX_BACKOFF = Duration.ofSeconds(30); - public final static int DEFAULT_MAX_RETRY_COUNT = 10; + public static final int DEFAULT_MAX_RETRY_COUNT = 10; - public final static boolean DEFAULT_IS_TRANSIENT = true; + public static final boolean DEFAULT_IS_TRANSIENT = true; - public final static int REACTOR_IO_POLL_TIMEOUT = 20; - public final static int SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS = 4; + public static final int REACTOR_IO_POLL_TIMEOUT = 20; + public static final int SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS = 4; - public final static String NO_RETRY = "NoRetry"; - public final static String DEFAULT_RETRY = "Default"; + public static final String NO_RETRY = "NoRetry"; + public static final String DEFAULT_RETRY = "Default"; public static final String REQUEST_RESPONSE_OPERATION_NAME = "operation"; public static final String REQUEST_RESPONSE_TIMEOUT = AmqpConstants.VENDOR + ":server-timeout"; diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java index b41bab8f2431..d0886843f906 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java @@ -13,103 +13,77 @@ * Internal-class * @since 1.0 */ -public abstract class ClientEntity -{ +public abstract class ClientEntity { private final String clientId; private final Object syncClose; private boolean isClosing; private boolean isClosed; - protected ClientEntity(final String clientId) - { + protected ClientEntity(final String clientId) { this.clientId = clientId; this.syncClose = new Object(); } protected abstract CompletableFuture onClose(); - public String getClientId() - { + public String getClientId() { return this.clientId; } - protected boolean getIsClosed() - { - synchronized (this.syncClose) - { + protected boolean getIsClosed() { + synchronized (this.syncClose) { return this.isClosed; } } - protected boolean getIsClosingOrClosed() - { - - synchronized (this.syncClose) - { + protected boolean getIsClosingOrClosed() { + synchronized (this.syncClose) { return this.isClosing || this.isClosed; } } // used to force close when entity is faulted - protected final void setClosed() - { - synchronized (this.syncClose) - { + protected final void setClosed() { + synchronized (this.syncClose) { this.isClosed = true; } } - public final CompletableFuture closeAsync() - { - if(this.getIsClosingOrClosed()) - { + public final CompletableFuture closeAsync() { + if (this.getIsClosingOrClosed()) { return CompletableFuture.completedFuture(null); } - synchronized (this.syncClose) - { + synchronized (this.syncClose) { this.isClosing = true; } - return this.onClose().thenRunAsync(new Runnable() - { - @Override - public void run() - { - synchronized (ClientEntity.this.syncClose) - { - ClientEntity.this.isClosing = false; - ClientEntity.this.isClosed = true; - } - }}, MessagingFactory.INTERNAL_THREAD_POOL); + return this.onClose().thenRunAsync(() -> { + synchronized (ClientEntity.this.syncClose) { + ClientEntity.this.isClosing = false; + ClientEntity.this.isClosed = true; + } + }, MessagingFactory.INTERNAL_THREAD_POOL); } - public final void close() throws ServiceBusException - { - try - { + public final void close() throws ServiceBusException { + try { this.closeAsync().get(); - } - catch (InterruptedException|ExecutionException exception) - { - if (exception instanceof InterruptedException) - { + } catch (InterruptedException | ExecutionException exception) { + if (exception instanceof InterruptedException) { // Re-assert the thread's interrupted status Thread.currentThread().interrupt(); } Throwable throwable = exception.getCause(); - if (throwable != null) - { - if (throwable instanceof RuntimeException) - { - throw (RuntimeException)throwable; + if (throwable != null) { + if (throwable instanceof RuntimeException) { + throw (RuntimeException) throwable; } - if (throwable instanceof ServiceBusException) - { - throw (ServiceBusException)throwable; + if (throwable instanceof ServiceBusException) { + throw (ServiceBusException) throwable; } throw new ServiceBusException(true, throwable); @@ -117,20 +91,16 @@ public final void close() throws ServiceBusException } } - protected final void throwIfClosed(Throwable cause) - { - if (this.getIsClosingOrClosed()) - { + protected final void throwIfClosed(Throwable cause) { + if (this.getIsClosingOrClosed()) { throw new IllegalStateException(String.format(Locale.US, "Operation not allowed after the %s instance is closed.", this.getClass().getName()), cause); } } @Override - protected void finalize() - { + protected void finalize() { try { - if(!this.isClosed) - { + if (!this.isClosed) { this.close(); } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CommonRequestResponseOperations.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CommonRequestResponseOperations.java index dd4a6bc4a0ff..2dcedf962d62 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CommonRequestResponseOperations.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CommonRequestResponseOperations.java @@ -22,14 +22,12 @@ final class CommonRequestResponseOperations { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CommonRequestResponseOperations.class); - static CompletableFuture> peekMessagesAsync(RequestResponseLink requestResponseLink, Duration operationTimeout, long fromSequenceNumber, int messageCount, String sessionId, String associatedLinkName) - { + static CompletableFuture> peekMessagesAsync(RequestResponseLink requestResponseLink, Duration operationTimeout, long fromSequenceNumber, int messageCount, String sessionId, String associatedLinkName) { TRACE_LOGGER.debug("Peeking '{}' messages from sequence number '{}' in entity '{}', sessionId '{}'", messageCount, fromSequenceNumber, requestResponseLink.getLinkPath(), sessionId); HashMap requestBodyMap = new HashMap(); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_FROM_SEQUENCE_NUMER, fromSequenceNumber); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_MESSAGE_COUNT, messageCount); - if(sessionId != null) - { + if (sessionId != null) { requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, sessionId); } Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_PEEK_OPERATION, requestBodyMap, Util.adjustServerTimeout(operationTimeout), associatedLinkName); @@ -37,21 +35,16 @@ static CompletableFuture> peekMessagesAsync(RequestResponseL return responseFuture.thenComposeAsync((responseMessage) -> { CompletableFuture> returningFuture = new CompletableFuture>(); int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); - if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) - { + if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) { List peekedMessages = new ArrayList(); - Object responseBodyMap = ((AmqpValue)responseMessage.getBody()).getValue(); - if(responseBodyMap != null && responseBodyMap instanceof Map) - { - Object messages = ((Map)responseBodyMap).get(ClientConstants.REQUEST_RESPONSE_MESSAGES); - if(messages != null && messages instanceof Iterable) - { - for(Object message : (Iterable)messages) - { - if(message instanceof Map) - { + Object responseBodyMap = ((AmqpValue) responseMessage.getBody()).getValue(); + if (responseBodyMap != null && responseBodyMap instanceof Map) { + Object messages = ((Map) responseBodyMap).get(ClientConstants.REQUEST_RESPONSE_MESSAGES); + if (messages != null && messages instanceof Iterable) { + for (Object message : (Iterable) messages) { + if (message instanceof Map) { Message peekedMessage = Message.Factory.create(); - Binary messagePayLoad = (Binary)((Map)message).get(ClientConstants.REQUEST_RESPONSE_MESSAGE); + Binary messagePayLoad = (Binary) ((Map) message).get(ClientConstants.REQUEST_RESPONSE_MESSAGE); peekedMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(), messagePayLoad.getLength()); peekedMessages.add(peekedMessage); } @@ -60,15 +53,11 @@ static CompletableFuture> peekMessagesAsync(RequestResponseL } TRACE_LOGGER.debug("Peeked '{}' messages from sequence number '{}' in entity '{}', sessionId '{}'", peekedMessages.size(), fromSequenceNumber, requestResponseLink.getLinkPath(), sessionId); returningFuture.complete(peekedMessages); - } - else if(statusCode == ClientConstants.REQUEST_RESPONSE_NOCONTENT_STATUS_CODE || - (statusCode == ClientConstants.REQUEST_RESPONSE_NOTFOUND_STATUS_CODE && ClientConstants.MESSAGE_NOT_FOUND_ERROR.equals(RequestResponseUtils.getResponseErrorCondition(responseMessage)))) - { + } else if (statusCode == ClientConstants.REQUEST_RESPONSE_NOCONTENT_STATUS_CODE + || (statusCode == ClientConstants.REQUEST_RESPONSE_NOTFOUND_STATUS_CODE && ClientConstants.MESSAGE_NOT_FOUND_ERROR.equals(RequestResponseUtils.getResponseErrorCondition(responseMessage)))) { TRACE_LOGGER.debug("Peek from sequence number '{}' in entity '{}', sessionId '{}' didnot find any messages", fromSequenceNumber, requestResponseLink.getLinkPath(), sessionId); - returningFuture.complete(new ArrayList()); - } - else - { + returningFuture.complete(new ArrayList<>()); + } else { // error response Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); TRACE_LOGGER.error("Peeking messages from sequence number '{}' in entity '{}', sessionId '{}' failed", fromSequenceNumber, requestResponseLink.getLinkPath(), sessionId, failureException); @@ -78,23 +67,19 @@ else if(statusCode == ClientConstants.REQUEST_RESPONSE_NOCONTENT_STATUS_CODE || }, MessagingFactory.INTERNAL_THREAD_POOL); } - static CompletableFuture sendCBSTokenAsync(RequestResponseLink requestResponseLink, Duration operationTimeout, SecurityToken securityToken) - { + static CompletableFuture sendCBSTokenAsync(RequestResponseLink requestResponseLink, Duration operationTimeout, SecurityToken securityToken) { TRACE_LOGGER.debug("Sending CBS Token of type '{}' to '{}'", securityToken.getTokenType(), securityToken.getTokenAudience()); Message requestMessage = RequestResponseUtils.createRequestMessageFromValueBody(ClientConstants.REQUEST_RESPONSE_PUT_TOKEN_OPERATION, securityToken.getTokenValue(), Util.adjustServerTimeout(operationTimeout)); requestMessage.getApplicationProperties().getValue().put(ClientConstants.REQUEST_RESPONSE_PUT_TOKEN_TYPE, securityToken.getTokenType().toString()); requestMessage.getApplicationProperties().getValue().put(ClientConstants.REQUEST_RESPONSE_PUT_TOKEN_AUDIENCE, securityToken.getTokenAudience()); CompletableFuture responseFuture = requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, operationTimeout); return responseFuture.thenComposeAsync((responseMessage) -> { - CompletableFuture returningFuture = new CompletableFuture(); + CompletableFuture returningFuture = new CompletableFuture<>(); int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); - if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE || statusCode == ClientConstants.REQUEST_RESPONSE_ACCEPTED_STATUS_CODE) - { + if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE || statusCode == ClientConstants.REQUEST_RESPONSE_ACCEPTED_STATUS_CODE) { TRACE_LOGGER.debug("CBS Token of type '{}' sent to '{}'", securityToken.getTokenType(), securityToken.getTokenAudience()); returningFuture.complete(null); - } - else - { + } else { // error response Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); TRACE_LOGGER.error("Sending CBS Token to '{}' failed", securityToken.getTokenAudience());