Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ public class WebSocketConnectionHandler extends ConnectionHandler {

private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class);

public WebSocketConnectionHandler(IAmqpConnection messagingFactory)
{
public WebSocketConnectionHandler(IAmqpConnection messagingFactory) {
super(messagingFactory);
}

@Override
public void addTransportLayers(final Event event, final TransportInternal transport)
{
public void addTransportLayers(final Event event, final TransportInternal transport) {
final String hostName = event.getConnection().getHostname();

final WebSocketImpl webSocket = new WebSocketImpl();
Expand All @@ -37,21 +35,19 @@ public void addTransportLayers(final Event event, final TransportInternal transp
transport.addTransportLayer(webSocket);

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName +"]");
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName + "]");
}

super.addTransportLayers(event, transport);
}

@Override
public int getProtocolPort()
{
public int getProtocolPort() {
return ClientConstants.HTTPS_PORT;
}

@Override
public int getMaxFrameSize()
{
public int getMaxFrameSize() {
// This is the current limitation of https://github.com/Azure/qpid-proton-j-extensions
// once, this library enables larger frames - this property can be removed.
return AmqpConstants.WEBSOCKET_MAX_FRAME_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@
* This class can be used to format the path for different Service Bus entity types.
*/
public class EntityNameHelper {
private static final String pathDelimiter = "/";
private static final String subscriptionsSubPath = "Subscriptions";
private static final String rulesSubPath = "Rules";
private static final String subQueuePrefix = "$";
private static final String deadLetterQueueSuffix = "DeadLetterQueue";
private static final String deadLetterQueueName = subQueuePrefix + deadLetterQueueSuffix;
private static final String transfer = "Transfer";
private static final String transferDeadLetterQueueName = subQueuePrefix + transfer + pathDelimiter + deadLetterQueueName;
private static final String PATH_DELIMITER = "/";
private static final String SUBSCRIPTIONS_SUB_PATH = "Subscriptions";
private static final String RULES_SUB_PATH = "Rules";
private static final String SUB_QUEUE_PREFIX = "$";
private static final String DEAD_LETTER_QUEUE_SUFFIX = "DeadLetterQueue";
private static final String DEAD_LETTER_QUEUE_NAME = SUB_QUEUE_PREFIX + DEAD_LETTER_QUEUE_SUFFIX;
private static final String TRANSFER = "Transfer";
private static final String TRANSFER_DEAD_LETTER_QUEUE_NAME = SUB_QUEUE_PREFIX + TRANSFER + PATH_DELIMITER + DEAD_LETTER_QUEUE_NAME;

/**
* Formats the dead letter path for either a queue, or a subscription.
* @param entityPath - The name of the queue, or path of the subscription.
* @return - The path as a String of the dead letter entity.
*/
public static String formatDeadLetterPath(String entityPath) {
return formatSubQueuePath(entityPath, deadLetterQueueName);
return formatSubQueuePath(entityPath, DEAD_LETTER_QUEUE_NAME);
}

/**
Expand All @@ -32,7 +32,7 @@ public static String formatDeadLetterPath(String entityPath) {
* @return The path of the subscription.
*/
public static String formatSubscriptionPath(String topicPath, String subscriptionName) {
return String.join(pathDelimiter, topicPath, subscriptionsSubPath, subscriptionName);
return String.join(PATH_DELIMITER, topicPath, SUBSCRIPTIONS_SUB_PATH, subscriptionName);
}

/**
Expand All @@ -43,11 +43,11 @@ public static String formatSubscriptionPath(String topicPath, String subscriptio
* @return The path of the rule
*/
public static String formatRulePath(String topicPath, String subscriptionName, String ruleName) {
return String.join(pathDelimiter,
return String.join(PATH_DELIMITER,
topicPath,
subscriptionsSubPath,
SUBSCRIPTIONS_SUB_PATH,
subscriptionName,
rulesSubPath,
RULES_SUB_PATH,
ruleName);
}

Expand All @@ -57,11 +57,11 @@ public static String formatRulePath(String topicPath, String subscriptionName, S
* @return The path of the transfer dead letter sub-queue for the entity
*/
public static String formatTransferDeadLetterPath(String entityPath) {
return String.join(pathDelimiter, entityPath, transferDeadLetterQueueName);
return String.join(PATH_DELIMITER, entityPath, TRANSFER_DEAD_LETTER_QUEUE_NAME);
}

static String formatSubQueuePath(String entityPath, String subQueueName) {
return entityPath + EntityNameHelper.pathDelimiter + subQueueName;
return entityPath + EntityNameHelper.PATH_DELIMITER + subQueueName;
}

static void checkValidQueueName(String queueName) {
Expand Down Expand Up @@ -90,18 +90,18 @@ private static void checkValidEntityName(String entityName, int maxEntityNameLen
throw new IllegalArgumentException("Entity path " + entityName + " exceeds the " + maxEntityNameLength + " character limit");
}

if (tempName.startsWith(pathDelimiter) || tempName.endsWith(pathDelimiter)) {
if (tempName.startsWith(PATH_DELIMITER) || tempName.endsWith(PATH_DELIMITER)) {
throw new IllegalArgumentException("The entity name cannot contain '/' as prefix or suffix.");
}

if (!allowSeparator && tempName.contains(pathDelimiter)) {
throw new IllegalArgumentException("The entity name contains an invalid character '" + pathDelimiter + "'");
if (!allowSeparator && tempName.contains(PATH_DELIMITER)) {
throw new IllegalArgumentException("The entity name contains an invalid character '" + PATH_DELIMITER + "'");
}

for (char key : ManagementClientConstants.InvalidEntityPathCharacters) {
if (entityName.indexOf(key) >= 0) {
throw new IllegalArgumentException(entityName + " contains character '" + key + "' which is not allowed" +
"because it is reserved in the Uri scheme.");
throw new IllegalArgumentException(entityName + " contains character '" + key + "' which is not allowed"
+ "because it is reserved in the Uri scheme.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import com.microsoft.azure.servicebus.ClientSettings;
import com.microsoft.azure.servicebus.Utils;
import com.microsoft.azure.servicebus.primitives.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.primitives.Util;
import com.microsoft.azure.servicebus.rules.RuleDescription;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,33 @@
package com.microsoft.azure.servicebus.management;

import com.microsoft.azure.servicebus.ClientSettings;
import com.microsoft.azure.servicebus.primitives.*;
import com.microsoft.azure.servicebus.primitives.AuthorizationFailedException;
import com.microsoft.azure.servicebus.primitives.ClientConstants;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsException;
import com.microsoft.azure.servicebus.primitives.MessagingEntityNotFoundException;
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
import com.microsoft.azure.servicebus.primitives.QuotaExceededException;
import com.microsoft.azure.servicebus.primitives.ServerBusyException;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.primitives.Util;
import com.microsoft.azure.servicebus.rules.RuleDescription;
import com.microsoft.azure.servicebus.security.SecurityToken;
import com.microsoft.azure.servicebus.security.TokenProvider;
import org.asynchttpclient.*;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.Response;
import org.asynchttpclient.util.HttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.*;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import javax.xml.parsers.DocumentBuilder;
Expand Down Expand Up @@ -72,8 +90,8 @@ public ManagementClientAsync(URI namespaceEndpointURI, ClientSettings clientSett
this.namespaceEndpointURI = namespaceEndpointURI;
this.clientSettings = clientSettings;
DefaultAsyncHttpClientConfig.Builder clientBuilder = Dsl.config()
.setConnectTimeout((int)CONNECTION_TIMEOUT.toMillis())
.setRequestTimeout((int)this.clientSettings.getOperationTimeout().toMillis());
.setConnectTimeout((int) CONNECTION_TIMEOUT.toMillis())
.setRequestTimeout((int) this.clientSettings.getOperationTimeout().toMillis());
this.asyncHttpClient = asyncHttpClient(clientBuilder);
}

Expand Down Expand Up @@ -977,7 +995,7 @@ private CompletableFuture<Void> deleteEntityAsync(String path) {
return exceptionFuture;
}

return sendManagementHttpRequestAsync(HttpConstants.Methods.DELETE, entityURL, null, null).thenAccept(c -> {});
return sendManagementHttpRequestAsync(HttpConstants.Methods.DELETE, entityURL, null, null).thenAccept(c -> { });
}

/**
Expand All @@ -990,7 +1008,7 @@ public void close() throws IOException {

private static URL getManagementURL(URI namespaceEndpontURI, String entityPath, String query) throws ServiceBusException {
try {
URI httpURI = new URI("https", null, namespaceEndpontURI.getHost(), getPortNumberFromHost(namespaceEndpontURI.getHost()), "/"+entityPath, query, null);
URI httpURI = new URI("https", null, namespaceEndpontURI.getHost(), getPortNumberFromHost(namespaceEndpontURI.getHost()), "/" + entityPath, query, null);
return httpURI.toURL();
} catch (URISyntaxException | MalformedURLException e) {
throw new ServiceBusException(false, e);
Expand Down Expand Up @@ -1027,8 +1045,7 @@ private CompletableFuture<String> sendManagementHttpRequestAsync(String httpMeth

CompletableFuture<String> outputFuture = new CompletableFuture<>();
listenableFuture.toCompletableFuture()
.handleAsync((response, ex) ->
{
.handleAsync((response, ex) -> {
if (ex != null) {
outputFuture.completeExceptionally(ex);
} else {
Expand Down Expand Up @@ -1057,8 +1074,7 @@ private static void validateHttpResponse(Request request, Response response) thr
}

ServiceBusException exception = null;
switch (response.getStatusCode())
{
switch (response.getStatusCode()) {
case 401: /*UnAuthorized*/
exception = new AuthorizationFailedException(exceptionMessage);
break;
Expand Down Expand Up @@ -1092,8 +1108,7 @@ private static void validateHttpResponse(Request request, Response response) thr
if (exceptionMessage.contains(ManagementClientConstants.ForbiddenInvalidOperationSubCode)) {
//todo: log
throw new UnsupportedOperationException(exceptionMessage);
}
else {
} else {
exception = new QuotaExceededException(exceptionMessage);
}
break;
Expand Down Expand Up @@ -1145,18 +1160,15 @@ private static String parseDetailIfAvailable(String content) {
return null;
}

private static String getSecurityToken(TokenProvider tokenProvider, String url ) throws InterruptedException, ExecutionException {
private static String getSecurityToken(TokenProvider tokenProvider, String url) throws InterruptedException, ExecutionException {
SecurityToken token = tokenProvider.getSecurityTokenAsync(url).get();
return token.getTokenValue();
}

private static int getPortNumberFromHost(String host) {
if(host.endsWith("onebox.windows-int.net"))
{
if (host.endsWith("onebox.windows-int.net")) {
return ONE_BOX_HTTPS_PORT;
}
else
{
} else {
return -1;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ public class QueueRuntimeInfo extends EntityRuntimeInfo {
private long messageCount;
private long sizeInBytes;

QueueRuntimeInfo(String path)
{
QueueRuntimeInfo(String path) {
this.path = path;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,33 @@
// Fields that require special serializations
public class SerializerUtil {

public static String serializeDuration(Duration duration)
{
if(duration == null || duration.isNegative() || duration.isZero())
{
public static String serializeDuration(Duration duration) {
if (duration == null || duration.isNegative() || duration.isZero()) {
return "";
}
Duration remainingTime = duration;
StringBuffer sb = new StringBuffer("P");
long days = remainingTime.toDays();
if(days > 0)
{
if (days > 0) {
sb.append(days).append("D");
remainingTime = duration.minusDays(days);
}
if(!remainingTime.isZero())
{
if (!remainingTime.isZero()) {
sb.append("T");
long hours = remainingTime.toHours();
if(hours > 0)
{
if (hours > 0) {
sb.append(hours).append("H");
remainingTime = duration.minusHours(hours);
}

long minutes = remainingTime.toMinutes();
if(minutes > 0)
{
if (minutes > 0) {
sb.append(minutes).append("M");
remainingTime = duration.minusMinutes(minutes);
}

long seconds = remainingTime.getSeconds();
if(seconds > 0)
{
if (seconds > 0) {
sb.append(seconds).append("S");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import com.microsoft.azure.servicebus.security.SecurityConstants;

import java.security.SecureRandom;
import java.util.*;
import java.util.Base64;
import java.util.HashSet;
import java.util.List;

public class SharedAccessAuthorizationRule extends AuthorizationRule {
static int SUPPORTED_SAS_KEY_LENGTH = 44;
static String FIXED_CLAIM_TYPE = "SharedAccessKey";
static final int SUPPORTED_SAS_KEY_LENGTH = 44;
static final String FIXED_CLAIM_TYPE = "SharedAccessKey";

private String keyName;
private String primaryKey;
Expand All @@ -37,7 +39,7 @@ public SharedAccessAuthorizationRule(String keyName, String primaryKey, String s

@Override
public String getClaimType() {
return SharedAccessAuthorizationRule.FIXED_CLAIM_TYPE;
return SharedAccessAuthorizationRule.FIXED_CLAIM_TYPE ;
}

@Override
Expand Down Expand Up @@ -135,8 +137,8 @@ public boolean equals(Object o) {
return false;
}

if ((this.rights != null && other.rights == null) ||
this.rights == null && other.rights != null) {
if ((this.rights != null && other.rights == null)
|| this.rights == null && other.rights != null) {
return false;
}

Expand Down
Loading