diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CommunicationException.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CommunicationException.java
index ac1bc9cee441..1913fdb15941 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CommunicationException.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CommunicationException.java
@@ -15,27 +15,22 @@
* @see http://go.microsoft.com/fwlink/?LinkId=761101
* @since 1.0
*/
-public class CommunicationException extends ServiceBusException
-{
+public class CommunicationException extends ServiceBusException {
private static final long serialVersionUID = 7968596830506494332L;
- CommunicationException()
- {
+ CommunicationException() {
super(true);
}
- CommunicationException(final String message)
- {
+ CommunicationException(final String message) {
super(true, message);
}
- CommunicationException(final Throwable cause)
- {
+ CommunicationException(final Throwable cause) {
super(true, cause);
}
- CommunicationException(final String message, final Throwable cause)
- {
+ CommunicationException(final String message, final Throwable cause) {
super(true, message, cause);
}
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java
index cb42b205c5a8..9453d890f17a 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ConnectionStringBuilder.java
@@ -3,11 +3,13 @@
package com.microsoft.azure.servicebus.primitives;
-import java.net.*;
-import java.time.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
import java.time.format.DateTimeParseException;
-import java.util.*;
-import java.util.regex.*;
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* This class can be used to construct a connection string which can establish communication with ServiceBus entities.
@@ -37,26 +39,25 @@
* @since 1.0
*/
-public class ConnectionStringBuilder
-{
- private final static String END_POINT_RAW_FORMAT = "amqps://%s";
-
- private final static String HOSTNAME_CONFIG_NAME = "Hostname";
- private final static String ENDPOINT_CONFIG_NAME = "Endpoint";
- private final static String SHARED_ACCESS_KEY_NAME_CONFIG_NAME = "SharedAccessKeyName";
- private final static String SHARED_ACCESS_KEY_CONFIG_NAME = "SharedAccessKey";
- private final static String ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME = "SharedAccessSignature";
- private final static String SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME = "SharedAccessSignatureToken";
- private final static String TRANSPORT_TYPE_CONFIG_NAME = "TransportType";
- private final static String ENTITY_PATH_CONFIG_NAME = "EntityPath";
- private final static String OPERATION_TIMEOUT_CONFIG_NAME = "OperationTimeout";
- private final static String RETRY_POLICY_CONFIG_NAME = "RetryPolicy";
- private final static String KEY_VALUE_SEPARATOR = "=";
- private final static String KEY_VALUE_PAIR_DELIMITER = ";";
-
- private static final String ALL_KEY_ENUMERATE_REGEX = "(" + HOSTNAME_CONFIG_NAME + "|" + ENDPOINT_CONFIG_NAME + "|" + SHARED_ACCESS_KEY_NAME_CONFIG_NAME
- + "|" + SHARED_ACCESS_KEY_CONFIG_NAME + "|" + SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME + "|" + ENTITY_PATH_CONFIG_NAME + "|" + OPERATION_TIMEOUT_CONFIG_NAME
- + "|" + RETRY_POLICY_CONFIG_NAME + "|" + ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME + "|" + TRANSPORT_TYPE_CONFIG_NAME + "|" +")";
+public class ConnectionStringBuilder {
+ private static final String END_POINT_RAW_FORMAT = "amqps://%s";
+
+ private static final String HOSTNAME_CONFIG_NAME = "Hostname";
+ private static final String ENDPOINT_CONFIG_NAME = "Endpoint";
+ private static final String SHARED_ACCESS_KEY_NAME_CONFIG_NAME = "SharedAccessKeyName";
+ private static final String SHARED_ACCESS_KEY_CONFIG_NAME = "SharedAccessKey";
+ private static final String ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME = "SharedAccessSignature";
+ private static final String SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME = "SharedAccessSignatureToken";
+ private static final String TRANSPORT_TYPE_CONFIG_NAME = "TransportType";
+ private static final String ENTITY_PATH_CONFIG_NAME = "EntityPath";
+ private static final String OPERATION_TIMEOUT_CONFIG_NAME = "OperationTimeout";
+ private static final String RETRY_POLICY_CONFIG_NAME = "RetryPolicy";
+ private static final String KEY_VALUE_SEPARATOR = "=";
+ private static final String KEY_VALUE_PAIR_DELIMITER = ";";
+
+ private static final String ALL_KEY_ENUMERATE_REGEX = "(" + HOSTNAME_CONFIG_NAME + "|" + ENDPOINT_CONFIG_NAME + "|" + SHARED_ACCESS_KEY_NAME_CONFIG_NAME
+ + "|" + SHARED_ACCESS_KEY_CONFIG_NAME + "|" + SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME + "|" + ENTITY_PATH_CONFIG_NAME + "|" + OPERATION_TIMEOUT_CONFIG_NAME
+ + "|" + RETRY_POLICY_CONFIG_NAME + "|" + ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME + "|" + TRANSPORT_TYPE_CONFIG_NAME + "|" + ")";
private static final String KEYS_WITH_DELIMITERS_REGEX = KEY_VALUE_PAIR_DELIMITER + ALL_KEY_ENUMERATE_REGEX + KEY_VALUE_SEPARATOR;
@@ -75,13 +76,11 @@ public class ConnectionStringBuilder
* Default operation timeout if timeout is not specified in the connection string. 30 seconds.
*/
public static final Duration DefaultOperationTimeout = Duration.ofSeconds(ClientConstants.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS);
-
private ConnectionStringBuilder(
final URI endpointAddress,
final String entityPath,
final Duration operationTimeout,
- final RetryPolicy retryPolicy)
- {
+ final RetryPolicy retryPolicy) {
this.endpoint = endpointAddress;
this.operationTimeout = operationTimeout;
this.retryPolicy = retryPolicy;
@@ -94,8 +93,7 @@ private ConnectionStringBuilder(
final String sharedAccessKeyName,
final String sharedAccessKey,
final Duration operationTimeout,
- final RetryPolicy retryPolicy)
- {
+ final RetryPolicy retryPolicy) {
this(endpointAddress, entityPath, operationTimeout, retryPolicy);
this.sharedAccessKey = sharedAccessKey;
this.sharedAccessKeyName = sharedAccessKeyName;
@@ -106,8 +104,7 @@ private ConnectionStringBuilder(
final String entityPath,
final String sharedAccessSingatureToken,
final Duration operationTimeout,
- final RetryPolicy retryPolicy)
- {
+ final RetryPolicy retryPolicy) {
this(endpointAddress, entityPath, operationTimeout, retryPolicy);
this.sharedAccessSingatureToken = sharedAccessSingatureToken;
}
@@ -118,8 +115,7 @@ private ConnectionStringBuilder(
final String sharedAccessKeyName,
final String sharedAccessKey,
final Duration operationTimeout,
- final RetryPolicy retryPolicy)
- {
+ final RetryPolicy retryPolicy) {
this(Util.convertNamespaceToEndPointURI(namespaceName), entityPath, sharedAccessKeyName, sharedAccessKey, operationTimeout, retryPolicy);
}
@@ -128,8 +124,7 @@ private ConnectionStringBuilder(
final String entityPath,
final String sharedAccessSingatureToken,
final Duration operationTimeout,
- final RetryPolicy retryPolicy)
- {
+ final RetryPolicy retryPolicy) {
this(Util.convertNamespaceToEndPointURI(namespaceName), entityPath, sharedAccessSingatureToken, operationTimeout, retryPolicy);
}
@@ -144,8 +139,7 @@ public ConnectionStringBuilder(
final String namespaceName,
final String entityPath,
final String sharedAccessKeyName,
- final String sharedAccessKey)
- {
+ final String sharedAccessKey) {
this(namespaceName, entityPath, sharedAccessKeyName, sharedAccessKey, ConnectionStringBuilder.DefaultOperationTimeout, RetryPolicy.getDefault());
}
@@ -158,8 +152,7 @@ public ConnectionStringBuilder(
public ConnectionStringBuilder(
final String namespaceName,
final String entityPath,
- final String sharedAccessSingature)
- {
+ final String sharedAccessSingature) {
this(namespaceName, entityPath, sharedAccessSingature, ConnectionStringBuilder.DefaultOperationTimeout, RetryPolicy.getDefault());
}
@@ -175,8 +168,7 @@ public ConnectionStringBuilder(
final URI endpointAddress,
final String entityPath,
final String sharedAccessKeyName,
- final String sharedAccessKey)
- {
+ final String sharedAccessKey) {
this(endpointAddress, entityPath, sharedAccessKeyName, sharedAccessKey, ConnectionStringBuilder.DefaultOperationTimeout, RetryPolicy.getDefault());
}
@@ -189,8 +181,7 @@ public ConnectionStringBuilder(
public ConnectionStringBuilder(
final URI endpointAddress,
final String entityPath,
- final String sharedAccessSingature)
- {
+ final String sharedAccessSingature) {
this(endpointAddress, entityPath, sharedAccessSingature, ConnectionStringBuilder.DefaultOperationTimeout, RetryPolicy.getDefault());
}
@@ -202,8 +193,7 @@ public ConnectionStringBuilder(
* @param connectionString ServiceBus ConnectionString
* @throws IllegalConnectionStringFormatException when the format of the ConnectionString is not valid
*/
- public ConnectionStringBuilder(String connectionString)
- {
+ public ConnectionStringBuilder(String connectionString) {
this.parseConnectionString(connectionString);
}
@@ -215,8 +205,7 @@ public ConnectionStringBuilder(String connectionString)
* @param namespaceConnectionString connections string of the ServiceBus namespace. This doesn't include the entity path.
* @param entityPath path to the entity within the namespace
*/
- public ConnectionStringBuilder(String namespaceConnectionString, String entityPath)
- {
+ public ConnectionStringBuilder(String namespaceConnectionString, String entityPath) {
this(namespaceConnectionString);
this.entityPath = entityPath;
}
@@ -225,8 +214,7 @@ public ConnectionStringBuilder(String namespaceConnectionString, String entityPa
* Get the endpoint which can be used to connect to the ServiceBus Namespace
* @return Endpoint representing the service bus namespace
*/
- public URI getEndpoint()
- {
+ public URI getEndpoint() {
return this.endpoint;
}
@@ -234,8 +222,7 @@ public URI getEndpoint()
* Get the shared access policy key value from the connection string or null.
* @return Shared Access Signature key value
*/
- public String getSasKey()
- {
+ public String getSasKey() {
return this.sharedAccessKey;
}
@@ -243,8 +230,7 @@ public String getSasKey()
* Get the shared access policy owner name from the connection string or null.
* @return Shared Access Signature key name
*/
- public String getSasKeyName()
- {
+ public String getSasKeyName() {
return this.sharedAccessKeyName;
}
@@ -252,8 +238,7 @@ public String getSasKeyName()
* Returns the shared access signature token from the connection string or null.
* @return Shared Access Signature Token
*/
- public String getSharedAccessSignatureToken()
- {
+ public String getSharedAccessSignatureToken() {
return this.sharedAccessSingatureToken;
}
@@ -261,8 +246,7 @@ public String getSharedAccessSignatureToken()
* Get the entity path value from the connection string
* @return Entity Path
*/
- public String getEntityPath()
- {
+ public String getEntityPath() {
return this.entityPath;
}
@@ -271,8 +255,7 @@ public String getEntityPath()
* This value will be used by all operations which uses this {@link ConnectionStringBuilder}, unless explicitly over-ridden.
* @return operationTimeout
*/
- public Duration getOperationTimeout()
- {
+ public Duration getOperationTimeout() {
return (this.operationTimeout == null ? ConnectionStringBuilder.DefaultOperationTimeout : this.operationTimeout);
}
@@ -281,8 +264,7 @@ public Duration getOperationTimeout()
*
ConnectionString with operationTimeout is not inter-operable between java and clients in other platforms.
* @param operationTimeout Operation Timeout
*/
- public void setOperationTimeout(final Duration operationTimeout)
- {
+ public void setOperationTimeout(final Duration operationTimeout) {
this.operationTimeout = operationTimeout;
}
@@ -290,8 +272,7 @@ public void setOperationTimeout(final Duration operationTimeout)
* Get the retry policy instance that was created as part of this builder's creation.
* @return RetryPolicy applied for any operation performed using this ConnectionString
*/
- public RetryPolicy getRetryPolicy()
- {
+ public RetryPolicy getRetryPolicy() {
return (this.retryPolicy == null ? RetryPolicy.getDefault() : this.retryPolicy);
}
@@ -300,8 +281,7 @@ public RetryPolicy getRetryPolicy()
*
RetryPolicy is not Serialized as part of {@link ConnectionStringBuilder#toString()} and is not interoperable with ServiceBus clients in other platforms.
* @param retryPolicy RetryPolicy applied for any operation performed using this ConnectionString
*/
- public void setRetryPolicy(final RetryPolicy retryPolicy)
- {
+ public void setRetryPolicy(final RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
}
@@ -312,8 +292,7 @@ public void setRetryPolicy(final RetryPolicy retryPolicy)
*
* @return transportType
*/
- public TransportType getTransportType()
- {
+ public TransportType getTransportType() {
return (this.transportType == null ? TransportType.AMQP : transportType);
}
@@ -323,8 +302,7 @@ public TransportType getTransportType()
* @param transportType Transport Type
* @return the {@link ConnectionStringBuilder} instance being set.
*/
- public ConnectionStringBuilder setTransportType(final TransportType transportType)
- {
+ public ConnectionStringBuilder setTransportType(final TransportType transportType) {
this.transportType = transportType;
return this;
}
@@ -334,55 +312,45 @@ public ConnectionStringBuilder setTransportType(final TransportType transportTyp
* @return connection string
*/
@Override
- public String toString()
- {
- if (StringUtil.isNullOrWhiteSpace(this.connectionString))
- {
+ public String toString() {
+ if (StringUtil.isNullOrWhiteSpace(this.connectionString)) {
StringBuilder connectionStringBuilder = new StringBuilder();
- if (this.endpoint != null)
- {
+ if (this.endpoint != null) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", ENDPOINT_CONFIG_NAME, KEY_VALUE_SEPARATOR,
this.endpoint.toString(), KEY_VALUE_PAIR_DELIMITER));
}
- if (!StringUtil.isNullOrWhiteSpace(this.entityPath))
- {
+ if (!StringUtil.isNullOrWhiteSpace(this.entityPath)) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", ENTITY_PATH_CONFIG_NAME,
KEY_VALUE_SEPARATOR, this.entityPath, KEY_VALUE_PAIR_DELIMITER));
}
- if (!StringUtil.isNullOrWhiteSpace(this.sharedAccessKeyName))
- {
+ if (!StringUtil.isNullOrWhiteSpace(this.sharedAccessKeyName)) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", SHARED_ACCESS_KEY_NAME_CONFIG_NAME,
KEY_VALUE_SEPARATOR, this.sharedAccessKeyName, KEY_VALUE_PAIR_DELIMITER));
}
- if (!StringUtil.isNullOrWhiteSpace(this.sharedAccessKey))
- {
+ if (!StringUtil.isNullOrWhiteSpace(this.sharedAccessKey)) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s", SHARED_ACCESS_KEY_CONFIG_NAME,
KEY_VALUE_SEPARATOR, this.sharedAccessKey));
}
- if (!StringUtil.isNullOrWhiteSpace(this.sharedAccessSingatureToken))
- {
+ if (!StringUtil.isNullOrWhiteSpace(this.sharedAccessSingatureToken)) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s", sharedAccessSignatureTokenKeyName,
KEY_VALUE_SEPARATOR, this.sharedAccessSingatureToken));
}
- if (this.operationTimeout != null)
- {
+ if (this.operationTimeout != null) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", KEY_VALUE_PAIR_DELIMITER, OPERATION_TIMEOUT_CONFIG_NAME,
KEY_VALUE_SEPARATOR, this.operationTimeout.toString()));
}
- if (this.retryPolicy != null)
- {
+ if (this.retryPolicy != null) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", KEY_VALUE_PAIR_DELIMITER, RETRY_POLICY_CONFIG_NAME,
KEY_VALUE_SEPARATOR, this.retryPolicy.toString()));
}
- if (this.transportType != null)
- {
+ if (this.transportType != null) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", KEY_VALUE_PAIR_DELIMITER, TRANSPORT_TYPE_CONFIG_NAME,
KEY_VALUE_SEPARATOR, this.transportType.toString()));
}
@@ -393,11 +361,9 @@ public String toString()
return this.connectionString;
}
- private void parseConnectionString(String connectionString)
- {
+ private void parseConnectionString(String connectionString) {
// TODO: Trace and throw
- if (StringUtil.isNullOrWhiteSpace(connectionString))
- {
+ if (StringUtil.isNullOrWhiteSpace(connectionString)) {
throw new IllegalConnectionStringFormatException(String.format("connectionString cannot be empty"));
}
@@ -407,129 +373,92 @@ private void parseConnectionString(String connectionString)
String[] values = keyValuePattern.split(connection);
Matcher keys = keyValuePattern.matcher(connection);
- if (values == null || values.length <= 1 || keys.groupCount() == 0)
- {
+ if (values == null || values.length <= 1 || keys.groupCount() == 0) {
throw new IllegalConnectionStringFormatException("Connection String cannot be parsed.");
}
- if (!StringUtil.isNullOrWhiteSpace((values[0])))
- {
+ if (!StringUtil.isNullOrWhiteSpace((values[0]))) {
throw new IllegalConnectionStringFormatException(
String.format(Locale.US, "Cannot parse part of ConnectionString: %s", values[0]));
}
int valueIndex = 0;
- while (keys.find())
- {
+ while (keys.find()) {
valueIndex++;
String key = keys.group();
key = key.substring(1, key.length() - 1);
- if (values.length < valueIndex + 1)
- {
+ if (values.length < valueIndex + 1) {
throw new IllegalConnectionStringFormatException(
String.format(Locale.US, "Value for the connection string parameter name: %s, not found", key));
}
- if (key.equalsIgnoreCase(ENDPOINT_CONFIG_NAME))
- {
- if (this.endpoint != null)
- {
+ if (key.equalsIgnoreCase(ENDPOINT_CONFIG_NAME)) {
+ if (this.endpoint != null) {
// we have parsed the endpoint once, which means we have multiple config which is not allowed
throw new IllegalConnectionStringFormatException(
String.format(Locale.US, "Multiple %s and/or %s detected. Make sure only one is defined", ENDPOINT_CONFIG_NAME, HOSTNAME_CONFIG_NAME));
}
- try
- {
+ try {
this.endpoint = new URI(values[valueIndex]);
- }
- catch(URISyntaxException exception)
- {
+ } catch (URISyntaxException exception) {
throw new IllegalConnectionStringFormatException(
String.format(Locale.US, "%s should be in format scheme://fullyQualifiedServiceBusNamespaceEndpointName", ENDPOINT_CONFIG_NAME),
exception);
}
- }
- else if (key.equalsIgnoreCase(HOSTNAME_CONFIG_NAME))
- {
- if (this.endpoint != null)
- {
+ } else if (key.equalsIgnoreCase(HOSTNAME_CONFIG_NAME)) {
+ if (this.endpoint != null) {
// we have parsed the endpoint once, which means we have multiple config which is not allowed
throw new IllegalConnectionStringFormatException(
String.format(Locale.US, "Multiple %s and/or %s detected. Make sure only one is defined", ENDPOINT_CONFIG_NAME, HOSTNAME_CONFIG_NAME));
}
- try
- {
+ try {
this.endpoint = new URI(String.format(Locale.US, END_POINT_RAW_FORMAT, values[valueIndex]));
- }
- catch(URISyntaxException exception)
- {
+ } catch (URISyntaxException exception) {
throw new IllegalConnectionStringFormatException(
String.format(Locale.US, "%s should be a fully quantified host name address", HOSTNAME_CONFIG_NAME),
exception);
}
- }
- else if(key.equalsIgnoreCase(SHARED_ACCESS_KEY_NAME_CONFIG_NAME))
- {
+ } else if (key.equalsIgnoreCase(SHARED_ACCESS_KEY_NAME_CONFIG_NAME)) {
this.sharedAccessKeyName = values[valueIndex];
- }
- else if(key.equalsIgnoreCase(SHARED_ACCESS_KEY_CONFIG_NAME))
- {
+ } else if (key.equalsIgnoreCase(SHARED_ACCESS_KEY_CONFIG_NAME)) {
this.sharedAccessKey = values[valueIndex];
- }
- else if(key.equalsIgnoreCase(SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME))
- {
+ } else if (key.equalsIgnoreCase(SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME)) {
this.sharedAccessSingatureToken = values[valueIndex];
this.sharedAccessSignatureTokenKeyName = SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME;
- }
- else if(key.equalsIgnoreCase(ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME))
- {
+ } else if (key.equalsIgnoreCase(ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME)) {
this.sharedAccessSingatureToken = values[valueIndex];
this.sharedAccessSignatureTokenKeyName = ALTERNATE_SHARED_ACCESS_SIGNATURE_TOKEN_CONFIG_NAME;
- }
- else if (key.equalsIgnoreCase(ENTITY_PATH_CONFIG_NAME))
- {
+ } else if (key.equalsIgnoreCase(ENTITY_PATH_CONFIG_NAME)) {
this.entityPath = values[valueIndex];
- }
- else if (key.equalsIgnoreCase(OPERATION_TIMEOUT_CONFIG_NAME))
- {
- try
- {
+ } else if (key.equalsIgnoreCase(OPERATION_TIMEOUT_CONFIG_NAME)) {
+ try {
this.operationTimeout = Duration.parse(values[valueIndex]);
- }
- catch(DateTimeParseException exception)
- {
+ } catch (DateTimeParseException exception) {
throw new IllegalConnectionStringFormatException("Invalid value specified for property 'Duration' in the ConnectionString.", exception);
}
- }
- else if (key.equalsIgnoreCase(RETRY_POLICY_CONFIG_NAME))
- {
+ } else if (key.equalsIgnoreCase(RETRY_POLICY_CONFIG_NAME)) {
this.retryPolicy = values[valueIndex].equals(ClientConstants.DEFAULT_RETRY)
? RetryPolicy.getDefault()
- : (values[valueIndex].equals(ClientConstants.NO_RETRY) ? RetryPolicy.getNoRetry() : null);
+ : (values[valueIndex].equals(ClientConstants.NO_RETRY) ? RetryPolicy.getNoRetry() : null);
- if (this.retryPolicy == null)
- throw new IllegalConnectionStringFormatException(
- String.format(Locale.US, "Connection string parameter '%s'='%s' is not recognized",
- RETRY_POLICY_CONFIG_NAME, values[valueIndex]));
- }
- else if (key.equalsIgnoreCase(TRANSPORT_TYPE_CONFIG_NAME))
- {
- try
- {
+ if (this.retryPolicy == null) {
+ throw new IllegalConnectionStringFormatException(
+ String.format(Locale.US, "Connection string parameter '%s'='%s' is not recognized",
+ RETRY_POLICY_CONFIG_NAME, values[valueIndex]));
+ }
+ } else if (key.equalsIgnoreCase(TRANSPORT_TYPE_CONFIG_NAME)) {
+ try {
this.transportType = TransportType.fromString(values[valueIndex]);
- } catch (IllegalArgumentException exception)
- {
+ } catch (IllegalArgumentException exception) {
throw new IllegalConnectionStringFormatException(
String.format("Invalid value specified for property '%s' in the ConnectionString.", TRANSPORT_TYPE_CONFIG_NAME),
exception);
}
- }
- else
- {
+ } else {
throw new IllegalConnectionStringFormatException(
String.format(Locale.US, "Illegal connection string parameter name: %s", key));
}
@@ -537,17 +466,14 @@ else if (key.equalsIgnoreCase(TRANSPORT_TYPE_CONFIG_NAME))
}
// Generates a string that is logged in traces. Excludes secrets
- public String toLoggableString()
- {
+ public String toLoggableString() {
StringBuilder connectionStringBuilder = new StringBuilder();
- if (this.endpoint != null)
- {
+ if (this.endpoint != null) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", ENDPOINT_CONFIG_NAME, KEY_VALUE_SEPARATOR,
this.endpoint.toString(), KEY_VALUE_PAIR_DELIMITER));
}
- if (!StringUtil.isNullOrWhiteSpace(this.entityPath))
- {
+ if (!StringUtil.isNullOrWhiteSpace(this.entityPath)) {
connectionStringBuilder.append(String.format(Locale.US, "%s%s%s%s", ENTITY_PATH_CONFIG_NAME,
KEY_VALUE_SEPARATOR, this.entityPath, KEY_VALUE_PAIR_DELIMITER));
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Controller.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Controller.java
index 7af403683386..a61a68f0f264 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Controller.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Controller.java
@@ -11,10 +11,12 @@
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.transaction.*;
+import org.apache.qpid.proton.amqp.transaction.Coordinator;
+import org.apache.qpid.proton.amqp.transaction.Declare;
+import org.apache.qpid.proton.amqp.transaction.Declared;
+import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.transport.Target;
-import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +34,7 @@ class Controller {
private URI namespaceEndpointURI;
private ClientSettings clientSettings;
- public Controller (URI namespaceEndpointURI, MessagingFactory factory, ClientSettings clientSettings) {
+ Controller(URI namespaceEndpointURI, MessagingFactory factory, ClientSettings clientSettings) {
this.namespaceEndpointURI = namespaceEndpointURI;
this.messagingFactory = factory;
this.clientSettings = clientSettings;
@@ -75,7 +77,7 @@ public CompletableFuture declareAsync() {
return this.internalSender.sendAndReturnDeliveryStateAsync(
message,
TransactionContext.NULL_TXN)
- .thenApply( state -> {
+ .thenApply(state -> {
Binary txnId = null;
if (state instanceof Declared) {
Declared declared = (Declared) state;
@@ -100,11 +102,10 @@ public CompletableFuture dischargeAsync(Binary txnId, boolean isCommit) {
return this.internalSender.sendAndReturnDeliveryStateAsync(
message,
TransactionContext.NULL_TXN)
- .thenCompose( state -> {
+ .thenCompose(state -> {
if (state instanceof Accepted) {
return CompletableFuture.completedFuture(null);
- }
- else {
+ } else {
CompletableFuture returnTask = new CompletableFuture<>();
returnTask.completeExceptionally(new UnsupportedOperationException("Received unknown state: " + state.toString()));
return returnTask;
@@ -116,8 +117,7 @@ protected CompletableFuture closeAsync() {
return null;
}
- private static SenderLinkSettings getControllerLinkSettings(MessagingFactory underlyingFactory)
- {
+ private static SenderLinkSettings getControllerLinkSettings(MessagingFactory underlyingFactory) {
SenderLinkSettings linkSettings = new SenderLinkSettings();
linkSettings.linkPath = "coordinator";
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IErrorContextProvider.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IErrorContextProvider.java
index 67ce9bed2163..693a9a888432 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IErrorContextProvider.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IErrorContextProvider.java
@@ -3,7 +3,6 @@
package com.microsoft.azure.servicebus.primitives;
-interface IErrorContextProvider
-{
+interface IErrorContextProvider {
ErrorContext getContext();
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IllegalConnectionStringFormatException.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IllegalConnectionStringFormatException.java
index f63995af8e1a..fa2482ae766c 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IllegalConnectionStringFormatException.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IllegalConnectionStringFormatException.java
@@ -7,26 +7,21 @@
* This exception is thrown when the connection string provided does not meet the requirement for connection.
* @since 1.0
*/
-public class IllegalConnectionStringFormatException extends IllegalArgumentException
-{
+public class IllegalConnectionStringFormatException extends IllegalArgumentException {
private static final long serialVersionUID = 2514898858133972030L;
- IllegalConnectionStringFormatException()
- {
+ IllegalConnectionStringFormatException() {
}
- IllegalConnectionStringFormatException(String detail)
- {
+ IllegalConnectionStringFormatException(String detail) {
super(detail);
}
- IllegalConnectionStringFormatException(Throwable cause)
- {
+ IllegalConnectionStringFormatException(Throwable cause) {
super(cause);
}
- IllegalConnectionStringFormatException(String detail, Throwable cause)
- {
+ IllegalConnectionStringFormatException(String detail, Throwable cause) {
super(detail, cause);
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IteratorUtil.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IteratorUtil.java
index 51aa0f8d4c41..cb261e9363e5 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IteratorUtil.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/IteratorUtil.java
@@ -5,27 +5,20 @@
import java.util.Iterator;
-final class IteratorUtil
-{
- private IteratorUtil()
- {
+final class IteratorUtil {
+ private IteratorUtil() {
}
- public static boolean sizeEquals(Iterable iterable, int expectedSize)
- {
+ public static boolean sizeEquals(Iterable iterable, int expectedSize) {
Iterator iterator = iterable.iterator();
int currentSize = 0;
- while(iterator.hasNext())
- {
- if (expectedSize > currentSize)
- {
+ while (iterator.hasNext()) {
+ if (expectedSize > currentSize) {
currentSize++;
iterator.next();
continue;
- }
- else
- {
+ } else {
return false;
}
}
@@ -33,27 +26,22 @@ public static boolean sizeEquals(Iterable iterable, int expectedSize)
return true;
}
- public static T getLast(Iterator iterator)
- {
+ public static T getLast(Iterator iterator) {
T last = null;
- while(iterator.hasNext())
- {
+ while (iterator.hasNext()) {
last = iterator.next();
}
return last;
}
- public static T getFirst(final Iterable iterable)
- {
- if (iterable == null)
- {
+ public static T getFirst(final Iterable iterable) {
+ if (iterable == null) {
return null;
}
final Iterator iterator = iterable.iterator();
- if (iterator == null)
- {
+ if (iterator == null) {
return null;
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageLockLostException.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageLockLostException.java
index 61e780fc3cb5..7470c3f07699 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageLockLostException.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageLockLostException.java
@@ -13,23 +13,19 @@ public class MessageLockLostException extends ServiceBusException {
private static final long serialVersionUID = -203350475131556600L;
- public MessageLockLostException()
- {
+ public MessageLockLostException() {
super(false);
}
- public MessageLockLostException(String message)
- {
+ public MessageLockLostException(String message) {
super(false, message);
}
- public MessageLockLostException(Throwable cause)
- {
+ public MessageLockLostException(Throwable cause) {
super(false, cause);
}
- public MessageLockLostException(String message, Throwable cause)
- {
+ public MessageLockLostException(String message, Throwable cause) {
super(false, message, cause);
}
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageNotFoundException.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageNotFoundException.java
index bbbc5db7bbc1..d2b73cfd08f4 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageNotFoundException.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageNotFoundException.java
@@ -12,23 +12,19 @@ public class MessageNotFoundException extends ServiceBusException {
private static final long serialVersionUID = -7138414297734634975L;
- public MessageNotFoundException()
- {
+ public MessageNotFoundException() {
super(false);
}
- public MessageNotFoundException(String message)
- {
+ public MessageNotFoundException(String message) {
super(false, message);
}
- public MessageNotFoundException(Throwable cause)
- {
+ public MessageNotFoundException(Throwable cause) {
super(false, cause);
}
- public MessageNotFoundException(String message, Throwable cause)
- {
+ public MessageNotFoundException(String message, Throwable cause) {
super(false, message, cause);
}
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageWithLockToken.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageWithLockToken.java
index beff523a36f7..088c09517f59 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageWithLockToken.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessageWithLockToken.java
@@ -11,8 +11,7 @@ public class MessageWithLockToken {
private final Message message;
private final UUID lockToken;
- public MessageWithLockToken(Message message, UUID lockToken)
- {
+ public MessageWithLockToken(Message message, UUID lockToken) {
this.message = message;
this.lockToken = lockToken;
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityAlreadyExistsException.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityAlreadyExistsException.java
index a45e1c9512d5..815be31cd60b 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityAlreadyExistsException.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityAlreadyExistsException.java
@@ -12,13 +12,11 @@ public class MessagingEntityAlreadyExistsException extends ServiceBusException {
private static final long serialVersionUID = -3652949479773950838L;
- public MessagingEntityAlreadyExistsException(String message)
- {
+ public MessagingEntityAlreadyExistsException(String message) {
super(false, message);
}
- public MessagingEntityAlreadyExistsException(String message, Throwable cause)
- {
+ public MessagingEntityAlreadyExistsException(String message, Throwable cause) {
super(false, message, cause);
}
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityDisabledException.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityDisabledException.java
index 73e62449a46b..d87f4c11b273 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityDisabledException.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityDisabledException.java
@@ -8,27 +8,23 @@
* @since 1.0
*
*/
-public class MessagingEntityDisabledException extends ServiceBusException{
+public class MessagingEntityDisabledException extends ServiceBusException {
private static final long serialVersionUID = 9086472912026637605L;
- public MessagingEntityDisabledException()
- {
+ public MessagingEntityDisabledException() {
super(false);
}
- public MessagingEntityDisabledException(String message)
- {
+ public MessagingEntityDisabledException(String message) {
super(false, message);
}
- public MessagingEntityDisabledException(Throwable cause)
- {
+ public MessagingEntityDisabledException(Throwable cause) {
super(false, cause);
}
- public MessagingEntityDisabledException(String message, Throwable cause)
- {
+ public MessagingEntityDisabledException(String message, Throwable cause) {
super(false, message, cause);
}
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityNotFoundException.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityNotFoundException.java
index 3b8dc477c54e..78e10971dc41 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityNotFoundException.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityNotFoundException.java
@@ -12,23 +12,19 @@ public class MessagingEntityNotFoundException extends ServiceBusException {
private static final long serialVersionUID = -4624769494653591824L;
- public MessagingEntityNotFoundException()
- {
+ public MessagingEntityNotFoundException() {
super(false);
}
- public MessagingEntityNotFoundException(String message)
- {
+ public MessagingEntityNotFoundException(String message) {
super(false, message);
}
- public MessagingEntityNotFoundException(Throwable cause)
- {
+ public MessagingEntityNotFoundException(Throwable cause) {
super(false, cause);
}
- public MessagingEntityNotFoundException(String message, Throwable cause)
- {
+ public MessagingEntityNotFoundException(String message, Throwable cause) {
super(false, message, cause);
}
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityType.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityType.java
index efe34e1b7305..e884e42a0013 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityType.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingEntityType.java
@@ -12,13 +12,11 @@ public enum MessagingEntityType {
FILTER(3);
private int enumValue;
- MessagingEntityType(int enumValue)
- {
+ MessagingEntityType(int enumValue) {
this.enumValue = enumValue;
}
- public int getIntValue()
- {
+ public int getIntValue() {
return this.enumValue;
}
}
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java
index fd5e9ff3f821..954a2070f1c1 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MessagingFactory.java
@@ -19,7 +19,13 @@
import com.microsoft.azure.servicebus.TransactionContext;
import com.microsoft.azure.servicebus.Utils;
-import com.microsoft.azure.servicebus.amqp.*;
+import com.microsoft.azure.servicebus.amqp.BaseLinkHandler;
+import com.microsoft.azure.servicebus.amqp.ConnectionHandler;
+import com.microsoft.azure.servicebus.amqp.DispatchHandler;
+import com.microsoft.azure.servicebus.amqp.IAmqpConnection;
+import com.microsoft.azure.servicebus.amqp.ProtonUtil;
+import com.microsoft.azure.servicebus.amqp.ReactorDispatcher;
+import com.microsoft.azure.servicebus.amqp.ReactorHandler;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
@@ -44,8 +50,7 @@
* or clients using the same MessagingFactory instance, all those senders, receivers or clients will share the same connection to the namespace.
* @since 1.0
*/
-public class MessagingFactory extends ClientEntity implements IAmqpConnection
-{
+public class MessagingFactory extends ClientEntity implements IAmqpConnection {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessagingFactory.class);
public static final ExecutorService INTERNAL_THREAD_POOL = Executors.newCachedThreadPool();
@@ -73,8 +78,7 @@ public class MessagingFactory extends ClientEntity implements IAmqpConnection
private final ClientSettings clientSettings;
- private MessagingFactory(URI namespaceEndpointUri, ClientSettings clientSettings)
- {
+ private MessagingFactory(URI namespaceEndpointUri, ClientSettings clientSettings) {
super("MessagingFactory".concat(StringUtil.getShortRandomString()));
this.clientSettings = clientSettings;
this.namespaceEndpointUri = namespaceEndpointUri;
@@ -86,11 +90,9 @@ private MessagingFactory(URI namespaceEndpointUri, ClientSettings clientSettings
this.factoryOpenFuture = new CompletableFuture();
this.cbsLinkCreationFuture = new CompletableFuture();
this.managementLinksCache = new RequestResponseLinkCache(this);
- this.reactorHandler = new ReactorHandler()
- {
+ this.reactorHandler = new ReactorHandler() {
@Override
- public void onReactorInit(Event e)
- {
+ public void onReactorInit(Event e) {
super.onReactorInit(e);
final Reactor r = e.getReactor();
@@ -180,33 +182,26 @@ private synchronized CompletableFuture createController() {
}
@Override
- public String getHostName()
- {
+ public String getHostName() {
return this.hostName;
}
- private Reactor getReactor()
- {
- synchronized (this.reactorLock)
- {
+ private Reactor getReactor() {
+ synchronized (this.reactorLock) {
return this.reactor;
}
}
- private ReactorDispatcher getReactorScheduler()
- {
- synchronized (this.reactorLock)
- {
+ private ReactorDispatcher getReactorScheduler() {
+ synchronized (this.reactorLock) {
return this.reactorScheduler;
}
}
- private void startReactor(ReactorHandler reactorHandler) throws IOException
- {
+ private void startReactor(ReactorHandler reactorHandler) throws IOException {
TRACE_LOGGER.info("Creating and starting reactor");
Reactor newReactor = ProtonUtil.reactor(reactorHandler, this.connectionHandler.getMaxFrameSize());
- synchronized (this.reactorLock)
- {
+ synchronized (this.reactorLock) {
this.reactor = newReactor;
this.reactorScheduler = new ReactorDispatcher(newReactor);
}
@@ -217,10 +212,8 @@ private void startReactor(ReactorHandler reactorHandler) throws IOException
TRACE_LOGGER.info("Started reactor");
}
- Connection getConnection()
- {
- if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED)
- {
+ Connection getConnection() {
+ if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
TRACE_LOGGER.info("Creating connection to host '{}:{}'", this.connectionHandler.getOutboundSocketHostName(), this.connectionHandler.getOutboundSocketPort());
this.connection = this.getReactor().connectionToHost(
this.connectionHandler.getOutboundSocketHostName(),
@@ -235,8 +228,7 @@ Connection getConnection()
* Gets the operation timeout from the connections string.
* @return operation timeout specified in the connection string
*/
- public Duration getOperationTimeout()
- {
+ public Duration getOperationTimeout() {
return this.clientSettings.getOperationTimeout();
}
@@ -244,25 +236,20 @@ public Duration getOperationTimeout()
* Gets the retry policy from the connection string.
* @return retry policy specified in the connection string
*/
- public RetryPolicy getRetryPolicy()
- {
+ public RetryPolicy getRetryPolicy() {
return this.clientSettings.getRetryPolicy();
}
- public ClientSettings getClientSettings()
- {
+ public ClientSettings getClientSettings() {
return this.clientSettings;
}
- public static CompletableFuture createFromNamespaceNameAsyc(String sbNamespaceName, ClientSettings clientSettings)
- {
+ public static CompletableFuture createFromNamespaceNameAsyc(String sbNamespaceName, ClientSettings clientSettings) {
return createFromNamespaceEndpointURIAsyc(Util.convertNamespaceToEndPointURI(sbNamespaceName), clientSettings);
}
- public static CompletableFuture createFromNamespaceEndpointURIAsyc(URI namespaceEndpointURI, ClientSettings clientSettings)
- {
- if(TRACE_LOGGER.isInfoEnabled())
- {
+ public static CompletableFuture createFromNamespaceEndpointURIAsyc(URI namespaceEndpointURI, ClientSettings clientSettings) {
+ if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("Creating messaging factory from namespace endpoint uri '{}'", namespaceEndpointURI.toString());
}
@@ -277,13 +264,11 @@ public static CompletableFuture createFromNamespaceEndpointURI
return messagingFactory.factoryOpenFuture;
}
- public static MessagingFactory createFromNamespaceName(String sbNamespaceName, ClientSettings clientSettings) throws InterruptedException, ServiceBusException
- {
+ public static MessagingFactory createFromNamespaceName(String sbNamespaceName, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
return completeFuture(createFromNamespaceNameAsyc(sbNamespaceName, clientSettings));
}
- public static MessagingFactory createFromNamespaceEndpointURI(URI namespaceEndpointURI, ClientSettings clientSettings) throws InterruptedException, ServiceBusException
- {
+ public static MessagingFactory createFromNamespaceEndpointURI(URI namespaceEndpointURI, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
return completeFuture(createFromNamespaceEndpointURIAsyc(namespaceEndpointURI, clientSettings));
}
@@ -293,10 +278,8 @@ public static MessagingFactory createFromNamespaceEndpointURI(URI namespaceEndpo
* @return a CompletableFuture which completes when a connection is established to the namespace or when a connection couldn't be established.
* @see java.util.concurrent.CompletableFuture
*/
- public static CompletableFuture createFromConnectionStringBuilderAsync(final ConnectionStringBuilder builder)
- {
- if(TRACE_LOGGER.isInfoEnabled())
- {
+ public static CompletableFuture createFromConnectionStringBuilderAsync(final ConnectionStringBuilder builder) {
+ if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("Creating messaging factory from connection string '{}'", builder.toLoggableString());
}
@@ -309,8 +292,7 @@ public static CompletableFuture createFromConnectionStringBuil
* @return a CompletableFuture which completes when a connection is established to the namespace or when a connection couldn't be established.
* @see java.util.concurrent.CompletableFuture
*/
- public static CompletableFuture createFromConnectionStringAsync(final String connectionString)
- {
+ public static CompletableFuture createFromConnectionStringAsync(final String connectionString) {
ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString);
return createFromConnectionStringBuilderAsync(builder);
}
@@ -322,8 +304,7 @@ public static CompletableFuture createFromConnectionStringAsyn
* @throws InterruptedException if blocking thread is interrupted
* @throws ExecutionException if a connection couldn't be established to the namespace. Cause of the failure can be found by calling {@link Exception#getCause()}
*/
- public static MessagingFactory createFromConnectionStringBuilder(final ConnectionStringBuilder builder) throws InterruptedException, ExecutionException
- {
+ public static MessagingFactory createFromConnectionStringBuilder(final ConnectionStringBuilder builder) throws InterruptedException, ExecutionException {
return createFromConnectionStringBuilderAsync(builder).get();
}
@@ -334,8 +315,7 @@ public static MessagingFactory createFromConnectionStringBuilder(final Connectio
* @throws InterruptedException if blocking thread is interrupted
* @throws ExecutionException if a connection couldn't be established to the namespace. Cause of the failure can be found by calling {@link Exception#getCause()}
*/
- public static MessagingFactory createFromConnectionString(final String connectionString) throws InterruptedException, ExecutionException
- {
+ public static MessagingFactory createFromConnectionString(final String connectionString) throws InterruptedException, ExecutionException {
return createFromConnectionStringAsync(connectionString).get();
}
@@ -343,18 +323,15 @@ public static MessagingFactory createFromConnectionString(final String connectio
* Internal method. Clients should not use this method.
*/
@Override
- public void onConnectionOpen()
- {
- if(!factoryOpenFuture.isDone())
- {
+ public void onConnectionOpen() {
+ if (!factoryOpenFuture.isDone()) {
TRACE_LOGGER.info("MessagingFactory opened.");
AsyncUtil.completeFuture(this.factoryOpenFuture, this);
}
// Connection opened. Initiate new cbs link creation
TRACE_LOGGER.info("Connection opened to host.");
- if(this.cbsLink == null)
- {
+ if (this.cbsLink == null) {
this.createCBSLinkAsync();
}
}
@@ -363,54 +340,40 @@ public void onConnectionOpen()
* Internal method. Clients should not use this method.
*/
@Override
- public void onConnectionError(ErrorCondition error)
- {
- if(error != null && error.getCondition() != null)
- {
+ public void onConnectionError(ErrorCondition error) {
+ if (error != null && error.getCondition() != null) {
TRACE_LOGGER.error("Connection error. '{}'", error);
}
- if (!this.factoryOpenFuture.isDone())
- {
+ if (!this.factoryOpenFuture.isDone()) {
AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, ExceptionUtil.toException(error));
this.setClosed();
- }
- else
- {
+ } else {
this.closeConnection(error, null);
}
- if (this.getIsClosingOrClosed() && !this.connetionCloseFuture.isDone())
- {
+ if (this.getIsClosingOrClosed() && !this.connetionCloseFuture.isDone()) {
TRACE_LOGGER.info("Connection to host closed.");
AsyncUtil.completeFuture(this.connetionCloseFuture, null);
Timer.unregister(this.getClientId());
}
}
- private void onReactorError(Exception cause)
- {
- if (!this.factoryOpenFuture.isDone())
- {
+ private void onReactorError(Exception cause) {
+ if (!this.factoryOpenFuture.isDone()) {
TRACE_LOGGER.error("Reactor error occured", cause);
AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, cause);
this.setClosed();
- }
- else
- {
- if(this.getIsClosingOrClosed())
- {
+ } else {
+ if (this.getIsClosingOrClosed()) {
return;
}
TRACE_LOGGER.warn("Reactor error occured", cause);
- try
- {
+ try {
this.startReactor(this.reactorHandler);
- }
- catch (IOException e)
- {
+ } catch (IOException e) {
Marker fatalMarker = MarkerFactory.getMarker(ClientConstants.FATAL_MARKER);
TRACE_LOGGER.error(fatalMarker, "Re-starting reactor failed with exception.", e);
this.onReactorError(cause);
@@ -421,41 +384,32 @@ private void onReactorError(Exception cause)
}
// One of the parameters must be null
- private void closeConnection(ErrorCondition error, Exception cause)
- {
+ private void closeConnection(ErrorCondition error, Exception cause) {
// Important to copy the reference of the connection as a call to getConnection might create a new connection while we are still in this method
Connection currentConnection = this.connection;
- if(currentConnection != null)
- {
+ if (currentConnection != null) {
Link[] links = this.registeredLinks.toArray(new Link[0]);
this.registeredLinks.clear();
TRACE_LOGGER.debug("Closing all links on the connection. Number of links '{}'", links.length);
- for(Link link : links)
- {
+ for (Link link : links) {
link.close();
}
TRACE_LOGGER.debug("Closed all links on the connection. Number of links '{}'", links.length);
- if (currentConnection.getLocalState() != EndpointState.CLOSED)
- {
+ if (currentConnection.getLocalState() != EndpointState.CLOSED) {
TRACE_LOGGER.info("Closing connection to host");
currentConnection.close();
}
- for(Link link : links)
- {
+ for (Link link : links) {
Handler handler = BaseHandler.getHandler(link);
- if (handler != null && handler instanceof BaseLinkHandler)
- {
+ if (handler != null && handler instanceof BaseLinkHandler) {
BaseLinkHandler linkHandler = (BaseLinkHandler) handler;
- if(error != null)
- {
+ if (error != null) {
linkHandler.processOnClose(link, error);
- }
- else
- {
+ } else {
linkHandler.processOnClose(link, cause);
}
}
@@ -464,38 +418,28 @@ private void closeConnection(ErrorCondition error, Exception cause)
}
@Override
- protected CompletableFuture onClose()
- {
- if (!this.getIsClosed())
- {
+ protected CompletableFuture onClose() {
+ if (!this.getIsClosed()) {
TRACE_LOGGER.info("Closing messaging factory");
CompletableFuture cbsLinkCloseFuture;
- if(this.cbsLink == null)
- {
+ if (this.cbsLink == null) {
cbsLinkCloseFuture = CompletableFuture.completedFuture(null);
- }
- else
- {
+ } else {
TRACE_LOGGER.info("Closing CBS link");
cbsLinkCloseFuture = this.cbsLink.closeAsync();
}
cbsLinkCloseFuture.thenRun(() -> this.managementLinksCache.freeAsync()).thenRun(() -> {
- if(this.cbsLinkCreationFuture != null && !this.cbsLinkCreationFuture.isDone())
- {
+ if (this.cbsLinkCreationFuture != null && !this.cbsLinkCreationFuture.isDone()) {
this.cbsLinkCreationFuture.completeExceptionally(new Exception("Connection closed."));
}
- if (this.connection != null && this.connection.getRemoteState() != EndpointState.CLOSED)
- {
+ if (this.connection != null && this.connection.getRemoteState() != EndpointState.CLOSED) {
try {
- this.scheduleOnReactorThread(new DispatchHandler()
- {
+ this.scheduleOnReactorThread(new DispatchHandler() {
@Override
- public void onEvent()
- {
- if (MessagingFactory.this.connection != null && MessagingFactory.this.connection.getLocalState() != EndpointState.CLOSED)
- {
+ public void onEvent() {
+ if (MessagingFactory.this.connection != null && MessagingFactory.this.connection.getLocalState() != EndpointState.CLOSED) {
TRACE_LOGGER.info("Closing connection to host");
MessagingFactory.this.connection.close();
}
@@ -505,58 +449,42 @@ public void onEvent()
this.connetionCloseFuture.completeExceptionally(e);
}
- Timer.schedule(new Runnable()
- {
- @Override
- public void run()
- {
- if (!MessagingFactory.this.connetionCloseFuture.isDone())
- {
- String errorMessage = "Closing MessagingFactory timed out.";
- TRACE_LOGGER.warn(errorMessage);
- AsyncUtil.completeFutureExceptionally(MessagingFactory.this.connetionCloseFuture, new TimeoutException(errorMessage));
- }
+ Timer.schedule(() -> {
+ if (!MessagingFactory.this.connetionCloseFuture.isDone()) {
+ String errorMessage = "Closing MessagingFactory timed out.";
+ TRACE_LOGGER.warn(errorMessage);
+ AsyncUtil.completeFutureExceptionally(MessagingFactory.this.connetionCloseFuture, new TimeoutException(errorMessage));
}
},
- this.clientSettings.getOperationTimeout(), TimerType.OneTimeRun);
- }
- else
- {
+ this.clientSettings.getOperationTimeout(), TimerType.OneTimeRun);
+ } else {
this.connetionCloseFuture.complete(null);
Timer.unregister(this.getClientId());
}
});
return this.connetionCloseFuture;
- }
- else
- {
+ } else {
return CompletableFuture.completedFuture(null);
}
}
- private class RunReactor implements Runnable
- {
- final private Reactor rctr;
+ private class RunReactor implements Runnable {
+ private final Reactor rctr;
- public RunReactor()
- {
+ RunReactor() {
this.rctr = MessagingFactory.this.getReactor();
}
- public void run()
- {
+ public void run() {
TRACE_LOGGER.info("starting reactor instance.");
- try
- {
+ try {
this.rctr.setTimeout(3141);
this.rctr.start();
boolean continueProcessing = true;
- while(!Thread.interrupted() && continueProcessing)
- {
+ while (!Thread.interrupted() && continueProcessing) {
// If factory is closed, stop reactor too
- if(MessagingFactory.this.getIsClosed())
- {
+ if (MessagingFactory.this.getIsClosed()) {
TRACE_LOGGER.info("Gracefully releasing reactor thread as messaging factory is closed");
break;
}
@@ -564,38 +492,32 @@ public void run()
}
TRACE_LOGGER.info("Stopping reactor");
this.rctr.stop();
- }
- catch (HandlerException handlerException)
- {
+ } catch (HandlerException handlerException) {
Throwable cause = handlerException.getCause();
- if (cause == null)
- {
+ if (cause == null) {
cause = handlerException;
}
TRACE_LOGGER.warn("UnHandled exception while processing events in reactor:", handlerException);
- String message = !StringUtil.isNullOrEmpty(cause.getMessage()) ?
- cause.getMessage():
- !StringUtil.isNullOrEmpty(handlerException.getMessage()) ?
- handlerException.getMessage() :
- "Reactor encountered unrecoverable error";
+ String message = !StringUtil.isNullOrEmpty(cause.getMessage())
+ ? cause.getMessage()
+ : !StringUtil.isNullOrEmpty(handlerException.getMessage())
+ ? handlerException.getMessage()
+ : "Reactor encountered unrecoverable error";
ServiceBusException sbException = new ServiceBusException(
true,
String.format(Locale.US, "%s, %s", message, ExceptionUtil.getTrackingIDAndTimeToLog()),
cause);
- if (cause instanceof UnresolvedAddressException)
- {
+ if (cause instanceof UnresolvedAddressException) {
sbException = new CommunicationException(
String.format(Locale.US, "%s. This is usually caused by incorrect hostname or network configuration. Please check to see if namespace information is correct. %s", message, ExceptionUtil.getTrackingIDAndTimeToLog()),
cause);
}
MessagingFactory.this.onReactorError(sbException);
- }
- finally
- {
+ } finally {
this.rctr.free();
}
}
@@ -605,10 +527,8 @@ public void run()
* Internal method. Clients should not use this method.
*/
@Override
- public void registerForConnectionError(Link link)
- {
- if(link != null)
- {
+ public void registerForConnectionError(Link link) {
+ if (link != null) {
this.registeredLinks.add(link);
}
}
@@ -617,89 +537,67 @@ public void registerForConnectionError(Link link)
* Internal method. Clients should not use this method.
*/
@Override
- public void deregisterForConnectionError(Link link)
- {
- if(link != null)
- {
+ public void deregisterForConnectionError(Link link) {
+ if (link != null) {
this.registeredLinks.remove(link);
}
}
- void scheduleOnReactorThread(final DispatchHandler handler) throws IOException
- {
+ void scheduleOnReactorThread(final DispatchHandler handler) throws IOException {
this.getReactorScheduler().invoke(handler);
}
- void scheduleOnReactorThread(final int delay, final DispatchHandler handler) throws IOException
- {
+ void scheduleOnReactorThread(final int delay, final DispatchHandler handler) throws IOException {
this.getReactorScheduler().invoke(delay, handler);
}
- CompletableFuture sendSecurityToken(String sasTokenAudienceUri)
- {
+ CompletableFuture sendSecurityToken(String sasTokenAudienceUri) {
TRACE_LOGGER.debug("Sending token for {}", sasTokenAudienceUri);
CompletableFuture tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceUri);
- return tokenFuture.thenComposeAsync((t) ->
- {
+ return tokenFuture.thenComposeAsync((t) -> {
SecurityToken generatedSecurityToken = t;
- CompletableFuture sendTokenFuture = this.cbsLinkCreationFuture.thenComposeAsync((v) -> {
- return CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, Util.adjustServerTimeout(this.clientSettings.getOperationTimeout()), generatedSecurityToken);
- }, MessagingFactory.INTERNAL_THREAD_POOL);
+ CompletableFuture sendTokenFuture = this.cbsLinkCreationFuture.thenComposeAsync((v) -> CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, Util.adjustServerTimeout(this.clientSettings.getOperationTimeout()), generatedSecurityToken), MessagingFactory.INTERNAL_THREAD_POOL);
- return sendTokenFuture.thenAccept((v) -> {
- TRACE_LOGGER.debug("Sent token for {}", sasTokenAudienceUri);});
+ return sendTokenFuture.thenAccept((v) -> TRACE_LOGGER.debug("Sent token for {}", sasTokenAudienceUri));
}, MessagingFactory.INTERNAL_THREAD_POOL);
}
- CompletableFuture> sendSecurityTokenAndSetRenewTimer(String sasTokenAudienceURI, boolean retryOnFailure, Runnable validityRenewer)
- {
+ CompletableFuture> sendSecurityTokenAndSetRenewTimer(String sasTokenAudienceURI, boolean retryOnFailure, Runnable validityRenewer) {
CompletableFuture> result = new CompletableFuture>();
TRACE_LOGGER.debug("Sending token for {}", sasTokenAudienceURI);
CompletableFuture sendTokenFuture = this.generateAndSendSecurityToken(sasTokenAudienceURI);
sendTokenFuture.handleAsync((validUntil, sendTokenEx) -> {
- if(sendTokenEx == null)
- {
+ if (sendTokenEx == null) {
TRACE_LOGGER.debug("Sent CBS token for {} and setting renew timer", sasTokenAudienceURI);
ScheduledFuture> renewalFuture = MessagingFactory.scheduleRenewTimer(validUntil, validityRenewer);
result.complete(renewalFuture);
- }
- else
- {
+ } else {
Throwable sendFailureCause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx);
TRACE_LOGGER.warn("Sending CBS Token for {} failed.", sasTokenAudienceURI, sendFailureCause);
- if(retryOnFailure)
- {
+ if (retryOnFailure) {
// Just schedule another attempt
TRACE_LOGGER.info("Will retry sending CBS Token for {} after {} seconds.", sasTokenAudienceURI, ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS);
ScheduledFuture> renewalFuture = Timer.schedule(validityRenewer, Duration.ofSeconds(ClientConstants.DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS), TimerType.OneTimeRun);
result.complete(renewalFuture);
- }
- else
- {
- if(sendFailureCause instanceof TimeoutException)
- {
+ } else {
+ if (sendFailureCause instanceof TimeoutException) {
// Retry immediately on timeout. This is a special case as CBSLink may be disconnected right after the token is sent, but before it reaches the service
TRACE_LOGGER.debug("Resending token for {}", sasTokenAudienceURI);
CompletableFuture resendTokenFuture = this.generateAndSendSecurityToken(sasTokenAudienceURI);
resendTokenFuture.handleAsync((resendValidUntil, resendTokenEx) -> {
- if(resendTokenEx == null)
- {
+ if (resendTokenEx == null) {
TRACE_LOGGER.debug("Sent CBS token for {} and setting renew timer", sasTokenAudienceURI);
ScheduledFuture> renewalFuture = MessagingFactory.scheduleRenewTimer(resendValidUntil, validityRenewer);
result.complete(renewalFuture);
- }
- else
- {
+ } else {
Throwable resendFailureCause = ExceptionUtil.extractAsyncCompletionCause(resendTokenEx);
TRACE_LOGGER.warn("Resending CBS Token for {} failed.", sasTokenAudienceURI, resendFailureCause);
result.completeExceptionally(resendFailureCause);
}
return null;
}, MessagingFactory.INTERNAL_THREAD_POOL);
- }
- else
- {
+ } else {
result.completeExceptionally(sendFailureCause);
}
}
@@ -710,95 +608,69 @@ CompletableFuture> sendSecurityTokenAndSetRenewTimer(String s
return result;
}
- private CompletableFuture generateAndSendSecurityToken(String sasTokenAudienceURI)
- {
+ private CompletableFuture generateAndSendSecurityToken(String sasTokenAudienceURI) {
CompletableFuture tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceURI);
- return tokenFuture.thenComposeAsync((t) ->
- {
+ return tokenFuture.thenComposeAsync((t) -> {
SecurityToken generatedSecurityToken = t;
- return this.cbsLinkCreationFuture.thenComposeAsync((v) -> {
- return CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, ClientConstants.SAS_TOKEN_SEND_TIMEOUT, generatedSecurityToken).thenApply((u) -> generatedSecurityToken.getValidUntil());
- }, MessagingFactory.INTERNAL_THREAD_POOL);
+ return this.cbsLinkCreationFuture.thenComposeAsync((v) -> CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, ClientConstants.SAS_TOKEN_SEND_TIMEOUT, generatedSecurityToken).thenApply((u) -> generatedSecurityToken.getValidUntil()), MessagingFactory.INTERNAL_THREAD_POOL);
}, MessagingFactory.INTERNAL_THREAD_POOL);
}
- private static ScheduledFuture> scheduleRenewTimer(Instant currentTokenValidUntil, Runnable validityRenewer)
- {
- if(currentTokenValidUntil == Instant.MAX)
- {
+ private static ScheduledFuture> scheduleRenewTimer(Instant currentTokenValidUntil, Runnable validityRenewer) {
+ if (currentTokenValidUntil == Instant.MAX) {
// User provided token or will never expire
return null;
- }
- else
- {
+ } else {
// It will eventually expire. Renew it
- int renewInterval = Util.getTokenRenewIntervalInSeconds((int)Duration.between(Instant.now(), currentTokenValidUntil).getSeconds());
+ int renewInterval = Util.getTokenRenewIntervalInSeconds((int) Duration.between(Instant.now(), currentTokenValidUntil).getSeconds());
return Timer.schedule(validityRenewer, Duration.ofSeconds(renewInterval), TimerType.OneTimeRun);
}
}
- CompletableFuture obtainRequestResponseLinkAsync(String entityPath, MessagingEntityType entityType)
- {
+ CompletableFuture obtainRequestResponseLinkAsync(String entityPath, MessagingEntityType entityType) {
this.throwIfClosed(null);
return this.managementLinksCache.obtainRequestResponseLinkAsync(entityPath, null, entityType);
}
- CompletableFuture obtainRequestResponseLinkAsync(String entityPath, String transferDestinationPath, MessagingEntityType entityType)
- {
+ CompletableFuture obtainRequestResponseLinkAsync(String entityPath, String transferDestinationPath, MessagingEntityType entityType) {
this.throwIfClosed(null);
return this.managementLinksCache.obtainRequestResponseLinkAsync(entityPath, transferDestinationPath, entityType);
}
- void releaseRequestResponseLink(String entityPath)
- {
- if(!this.getIsClosed())
- {
+ void releaseRequestResponseLink(String entityPath) {
+ if (!this.getIsClosed()) {
this.managementLinksCache.releaseRequestResponseLink(entityPath, null);
}
}
- void releaseRequestResponseLink(String entityPath, String transferDestinationPath)
- {
- if(!this.getIsClosed())
- {
+ void releaseRequestResponseLink(String entityPath, String transferDestinationPath) {
+ if (!this.getIsClosed()) {
this.managementLinksCache.releaseRequestResponseLink(entityPath, transferDestinationPath);
}
}
- private void createCBSLinkAsync()
- {
- if(this.getIsClosingOrClosed())
- {
+ private void createCBSLinkAsync() {
+ if (this.getIsClosingOrClosed()) {
return;
}
- if(++this.cbsLinkCreationAttempts > MAX_CBS_LINK_CREATION_ATTEMPTS )
- {
+ if (++this.cbsLinkCreationAttempts > MAX_CBS_LINK_CREATION_ATTEMPTS) {
Throwable completionEx = this.lastCBSLinkCreationException == null ? new Exception("CBS link creation failed multiple times.") : this.lastCBSLinkCreationException;
this.cbsLinkCreationFuture.completeExceptionally(completionEx);
- }
- else
- {
+ } else {
String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath();
TRACE_LOGGER.info("Creating CBS link to {}", requestResponseLinkPath);
- RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null, null, null).handleAsync((cbsLink, ex) ->
- {
- if(ex == null)
- {
+ RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null, null, null).handleAsync((cbsLink, ex) -> {
+ if (ex == null) {
TRACE_LOGGER.info("Created CBS link to {}", requestResponseLinkPath);
- if(this.getIsClosingOrClosed())
- {
+ if (this.getIsClosingOrClosed()) {
// Factory is closed before CBSLink could be created. Close the created CBS link too
cbsLink.closeAsync();
- }
- else
- {
+ } else {
this.cbsLink = cbsLink;
this.cbsLinkCreationFuture.complete(null);
}
- }
- else
- {
+ } else {
this.lastCBSLinkCreationException = ExceptionUtil.extractAsyncCompletionCause(ex);
TRACE_LOGGER.warn("Creating CBS link to {} failed. Attempts '{}'", requestResponseLinkPath, this.cbsLinkCreationAttempts);
this.createCBSLinkAsync();
diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java
index 1e297d4c64be..4ff49ee80a33 100644
--- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java
+++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/MiscRequestResponseOperationHandler.java
@@ -3,7 +3,11 @@
package com.microsoft.azure.servicebus.primitives;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import com.microsoft.azure.servicebus.TransactionContext;
@@ -14,8 +18,7 @@
import com.microsoft.azure.servicebus.rules.RuleDescription;
-public final class MiscRequestResponseOperationHandler extends ClientEntity
-{
+public final class MiscRequestResponseOperationHandler extends ClientEntity {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MiscRequestResponseOperationHandler.class);
private final Object requestResonseLinkCreationLock = new Object();
@@ -25,8 +28,7 @@ public final class MiscRequestResponseOperationHandler extends ClientEntity
private RequestResponseLink requestResponseLink;
private CompletableFuture requestResponseLinkCreationFuture;
- private MiscRequestResponseOperationHandler(MessagingFactory factory, String linkName, String entityPath, MessagingEntityType entityType)
- {
+ private MiscRequestResponseOperationHandler(MessagingFactory factory, String linkName, String entityPath, MessagingEntityType entityType) {
super(linkName);
this.underlyingFactory = factory;
@@ -35,19 +37,16 @@ private MiscRequestResponseOperationHandler(MessagingFactory factory, String lin
}
@Deprecated
- public static CompletableFuture create(MessagingFactory factory, String entityPath)
- {
- return create(factory, entityPath, null);
+ public static CompletableFuture create(MessagingFactory factory, String entityPath) {
+ return create(factory, entityPath, null);
}
- public static CompletableFuture create(MessagingFactory factory, String entityPath, MessagingEntityType entityType)
- {
+ public static CompletableFuture create(MessagingFactory factory, String entityPath, MessagingEntityType entityType) {
MiscRequestResponseOperationHandler requestResponseOperationHandler = new MiscRequestResponseOperationHandler(factory, StringUtil.getShortRandomString(), entityPath, entityType);
return CompletableFuture.completedFuture(requestResponseOperationHandler);
}
- private void closeInternals()
- {
+ private void closeInternals() {
this.closeRequestResponseLink();
}
@@ -58,26 +57,19 @@ protected CompletableFuture onClose() {
return CompletableFuture.completedFuture(null);
}
- private CompletableFuture createRequestResponseLink()
- {
+ private CompletableFuture createRequestResponseLink() {
synchronized (this.requestResonseLinkCreationLock) {
- if(this.requestResponseLinkCreationFuture == null)
- {
+ if (this.requestResponseLinkCreationFuture == null) {
this.requestResponseLinkCreationFuture = new CompletableFuture();
- this.underlyingFactory.obtainRequestResponseLinkAsync(this.entityPath, this.entityType).handleAsync((rrlink, ex) ->
- {
- if(ex == null)
- {
+ this.underlyingFactory.obtainRequestResponseLinkAsync(this.entityPath, this.entityType).handleAsync((rrlink, ex) -> {
+ if (ex == null) {
this.requestResponseLink = rrlink;
this.requestResponseLinkCreationFuture.complete(null);
- }
- else
- {
+ } else {
Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex);
this.requestResponseLinkCreationFuture.completeExceptionally(cause);
// Set it to null so next call will retry rr link creation
- synchronized (this.requestResonseLinkCreationLock)
- {
+ synchronized (this.requestResonseLinkCreationLock) {
this.requestResponseLinkCreationFuture = null;
}
}
@@ -89,12 +81,9 @@ private CompletableFuture createRequestResponseLink()
}
}
- private void closeRequestResponseLink()
- {
- synchronized (this.requestResonseLinkCreationLock)
- {
- if(this.requestResponseLinkCreationFuture != null)
- {
+ private void closeRequestResponseLink() {
+ synchronized (this.requestResonseLinkCreationLock) {
+ if (this.requestResponseLinkCreationFuture != null) {
this.requestResponseLinkCreationFuture.thenRun(() -> {
this.underlyingFactory.releaseRequestResponseLink(this.entityPath);
this.requestResponseLink = null;
@@ -104,16 +93,14 @@ private void closeRequestResponseLink()
}
}
- public CompletableFuture> getMessageSessionsAsync(Date lastUpdatedTime, int skip, int top, String lastSessionId)
- {
+ public CompletableFuture> getMessageSessionsAsync(Date lastUpdatedTime, int skip, int top, String lastSessionId) {
TRACE_LOGGER.debug("Getting message sessions from entity '{}' with lastupdatedtime '{}', skip '{}', top '{}', lastsessionid '{}'", this.entityPath, lastUpdatedTime, skip, top, lastSessionId);
return this.createRequestResponseLink().thenComposeAsync((v) -> {
HashMap requestBodyMap = new HashMap();
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LAST_UPDATED_TIME, lastUpdatedTime);
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SKIP, skip);
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_TOP, top);
- if(lastSessionId != null)
- {
+ if (lastSessionId != null) {
requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LAST_SESSION_ID, lastSessionId);
}
@@ -122,22 +109,17 @@ public CompletableFuture> getMessageSessionsAsync(Date l
return responseFuture.thenComposeAsync((responseMessage) -> {
CompletableFuture> returningFuture = new CompletableFuture>();
int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
- if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE)
- {
+ if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
Map responseBodyMap = RequestResponseUtils.getResponseBody(responseMessage);
- int responseSkip = (int)responseBodyMap.get(ClientConstants.REQUEST_RESPONSE_SKIP);
- String[] sessionIds = (String[])responseBodyMap.get(ClientConstants.REQUEST_RESPONSE_SESSIONIDS);
+ int responseSkip = (int) responseBodyMap.get(ClientConstants.REQUEST_RESPONSE_SKIP);
+ String[] sessionIds = (String[]) responseBodyMap.get(ClientConstants.REQUEST_RESPONSE_SESSIONIDS);
TRACE_LOGGER.debug("Received '{}' sessions from entity '{}'. Response skip '{}'", sessionIds.length, this.entityPath, responseSkip);
returningFuture.complete(new Pair<>(sessionIds, responseSkip));
- }
- else if(statusCode == ClientConstants.REQUEST_RESPONSE_NOCONTENT_STATUS_CODE ||
- (statusCode == ClientConstants.REQUEST_RESPONSE_NOTFOUND_STATUS_CODE && ClientConstants.SESSION_NOT_FOUND_ERROR.equals(RequestResponseUtils.getResponseErrorCondition(responseMessage))))
- {
+ } else if (statusCode == ClientConstants.REQUEST_RESPONSE_NOCONTENT_STATUS_CODE
+ || (statusCode == ClientConstants.REQUEST_RESPONSE_NOTFOUND_STATUS_CODE && ClientConstants.SESSION_NOT_FOUND_ERROR.equals(RequestResponseUtils.getResponseErrorCondition(responseMessage)))) {
TRACE_LOGGER.debug("Received no sessions from entity '{}'.", this.entityPath);
returningFuture.complete(new Pair<>(new String[0], 0));
- }
- else
- {
+ } else {
// error response
TRACE_LOGGER.debug("Receiving sessions from entity '{}' failed with status code '{}'", this.entityPath, statusCode);
returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage));
@@ -147,8 +129,7 @@ else if(statusCode == ClientConstants.REQUEST_RESPONSE_NOCONTENT_STATUS_CODE ||
}, MessagingFactory.INTERNAL_THREAD_POOL);
}
- public CompletableFuture removeRuleAsync(String ruleName)
- {
+ public CompletableFuture removeRuleAsync(String ruleName) {
TRACE_LOGGER.debug("Removing rule '{}' from entity '{}'", ruleName, this.entityPath);
return this.createRequestResponseLink().thenComposeAsync((v) -> {
HashMap requestBodyMap = new HashMap();
@@ -159,13 +140,10 @@ public CompletableFuture removeRuleAsync(String ruleName)
return responseFuture.thenComposeAsync((responseMessage) -> {
CompletableFuture returningFuture = new CompletableFuture();
int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
- if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE)
- {
+ if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
TRACE_LOGGER.debug("Removed rule '{}' from entity '{}'", ruleName, this.entityPath);
returningFuture.complete(null);
- }
- else
- {
+ } else {
// error response
TRACE_LOGGER.error("Removing rule '{}' from entity '{}' failed with status code '{}'", ruleName, this.entityPath, statusCode);
returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage));
@@ -175,8 +153,7 @@ public CompletableFuture removeRuleAsync(String ruleName)
}, MessagingFactory.INTERNAL_THREAD_POOL);
}
- public CompletableFuture addRuleAsync(RuleDescription ruleDescription)
- {
+ public CompletableFuture addRuleAsync(RuleDescription ruleDescription) {
TRACE_LOGGER.debug("Adding rule '{}' to entity '{}'", ruleDescription.getName(), this.entityPath);
return this.createRequestResponseLink().thenComposeAsync((v) -> {
HashMap requestBodyMap = new HashMap();
@@ -188,13 +165,10 @@ public CompletableFuture addRuleAsync(RuleDescription ruleDescription)
return responseFuture.thenComposeAsync((responseMessage) -> {
CompletableFuture returningFuture = new CompletableFuture();
int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
- if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE)
- {
+ if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
TRACE_LOGGER.debug("Added rule '{}' to entity '{}'", ruleDescription.getName(), this.entityPath);
returningFuture.complete(null);
- }
- else
- {
+ } else {
// error response
TRACE_LOGGER.error("Adding rule '{}' to entity '{}' failed with status code '{}'", ruleDescription.getName(), this.entityPath, statusCode);
returningFuture.completeExceptionally(RequestResponseUtils.genereateExceptionFromResponse(responseMessage));
@@ -204,8 +178,7 @@ public CompletableFuture addRuleAsync(RuleDescription ruleDescription)
}, MessagingFactory.INTERNAL_THREAD_POOL);
}
- public CompletableFuture> getRulesAsync(int skip, int top)
- {
+ public CompletableFuture> getRulesAsync(int skip, int top) {
TRACE_LOGGER.debug("Fetching rules for entity '{}'", this.entityPath);
return this.createRequestResponseLink().thenComposeAsync((v) -> {
HashMap requestBodyMap = new HashMap();
@@ -222,25 +195,19 @@ public CompletableFuture> getRulesAsync(int skip, in
Collection rules = new ArrayList();
int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage);
- if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE)
- {
+ if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) {
Map responseBodyMap = RequestResponseUtils.getResponseBody(responseMessage);
- ArrayList