diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java index b43190a27cbd..80fa786360e6 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/Message.java @@ -11,7 +11,7 @@ import java.util.Map; import java.util.UUID; -final public class Message implements Serializable, IMessage { +public final class Message implements Serializable, IMessage { private static final long serialVersionUID = 7849508139219590863L; private static final Charset DEFAULT_CHAR_SET = Charset.forName("UTF-8"); private static final String DEFAULT_CONTENT_TYPE = null; @@ -63,8 +63,7 @@ final public class Message implements Serializable, IMessage { /** * Creates an empty message with an empty byte array as body. */ - public Message() - { + public Message() { this(DEFAULT_CONTENT); } @@ -72,8 +71,7 @@ public Message() * Creates a message from a string. For backward compatibility reasons, the string is converted to a byte array and message body type is set to binary. * @param content content of the message. */ - public Message(String content) - { + public Message(String content) { this(content.getBytes(DEFAULT_CHAR_SET)); } @@ -81,8 +79,7 @@ public Message(String content) * Creates a message from a byte array. Message body type is set to binary. * @param content content of the message */ - public Message(byte[] content) - { + public Message(byte[] content) { this(Utils.fromBinay(content)); } @@ -90,8 +87,7 @@ public Message(byte[] content) * Creates a message from message body. * @param body message body */ - public Message(MessageBody body) - { + public Message(MessageBody body) { this(body, DEFAULT_CONTENT_TYPE); } @@ -100,8 +96,7 @@ public Message(MessageBody body) * @param content content of the message * @param contentType content type of the message */ - public Message(String content, String contentType) - { + public Message(String content, String contentType) { this(content.getBytes(DEFAULT_CHAR_SET), contentType); } @@ -110,8 +105,7 @@ public Message(String content, String contentType) * @param content content of the message * @param contentType content type of the message */ - public Message(byte[] content, String contentType) - { + public Message(byte[] content, String contentType) { this(Utils.fromBinay(content), contentType); } @@ -120,8 +114,7 @@ public Message(byte[] content, String contentType) * @param body message body * @param contentType content type of the message */ - public Message(MessageBody body, String contentType) - { + public Message(MessageBody body, String contentType) { this(UUID.randomUUID().toString(), body, contentType); } @@ -131,8 +124,7 @@ public Message(MessageBody body, String contentType) * @param content content of the message * @param contentType content type of the message */ - public Message(String messageId, String content, String contentType) - { + public Message(String messageId, String content, String contentType) { this(messageId, content.getBytes(DEFAULT_CHAR_SET), contentType); } @@ -142,8 +134,7 @@ public Message(String messageId, String content, String contentType) * @param content content of the message * @param contentType content type of the message */ - public Message(String messageId, byte[] content, String contentType) - { + public Message(String messageId, byte[] content, String contentType) { this(messageId, Utils.fromBinay(content), contentType); } @@ -153,8 +144,7 @@ public Message(String messageId, byte[] content, String contentType) * @param body message body * @param contentType content type of the message */ - public Message(String messageId, MessageBody body, String contentType) - { + public Message(String messageId, MessageBody body, String contentType) { this.messageId = messageId; this.messageBody = body; this.contentType = contentType; @@ -269,7 +259,7 @@ public String getTo() { @Override public void setTo(String to) { - this.to= to; + this.to = to; } @Override @@ -358,43 +348,37 @@ public UUID getLockToken() { return this.lockToken; } - void setLockToken(UUID lockToken){ + void setLockToken(UUID lockToken) { this.lockToken = lockToken; } - byte[] getDeliveryTag() - { + byte[] getDeliveryTag() { return this.deliveryTag; } - void setDeliveryTag(byte[] deliveryTag) - { + void setDeliveryTag(byte[] deliveryTag) { this.deliveryTag = deliveryTag; } @Override @Deprecated - public byte[] getBody() - { + public byte[] getBody() { return Utils.getDataFromMessageBody(this.messageBody); } @Override @Deprecated - public void setBody(byte[] body) - { + public void setBody(byte[] body) { this.messageBody = Utils.fromBinay(body); } @Override - public MessageBody getMessageBody() - { + public MessageBody getMessageBody() { return this.messageBody; } @Override - public void setMessageBody(MessageBody body) - { + public void setMessageBody(MessageBody body) { this.messageBody = body; } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java index 78618708128e..cc48ac6ff2f3 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageAndSessionPump.java @@ -51,7 +51,7 @@ class MessageAndSessionPump extends InitializableEntity implements IMessageAndSe private int prefetchCount; private ExecutorService customCodeExecutor; - public MessageAndSessionPump(MessagingFactory factory, String entityPath, MessagingEntityType entityType, ReceiveMode receiveMode) { + MessageAndSessionPump(MessagingFactory factory, String entityPath, MessagingEntityType entityType, ReceiveMode receiveMode) { super(StringUtil.getShortRandomString()); this.factory = factory; this.entityPath = entityPath; @@ -89,8 +89,7 @@ public void registerMessageHandler(IMessageHandler handler, MessageHandlerOption this.innerReceiver = ClientFactory.createMessageReceiverFromEntityPath(this.factory, this.entityPath, this.entityType, this.receiveMode); TRACE_LOGGER.info("Created MessageReceiver to entity '{}'", this.entityPath); - if(this.prefetchCount != UNSET_PREFETCH_COUNT) - { + if (this.prefetchCount != UNSET_PREFETCH_COUNT) { this.innerReceiver.setPrefetchCount(this.prefetchCount); } for (int i = 0; i < handlerOptions.getMaxConcurrentCalls(); i++) { @@ -129,10 +128,8 @@ public void registerSessionHandler(ISessionHandler handler, SessionHandlerOption } } - private static void assertNonNulls(Object handler, Object options, ExecutorService executorService) - { - if(handler == null || options == null || executorService == null) - { + private static void assertNonNulls(Object handler, Object options, ExecutorService executorService) { + if (handler == null || options == null || executorService == null) { throw new IllegalArgumentException("None of the arguments can be null."); } } @@ -185,8 +182,7 @@ private void receiveAndPumpMessage() { } // Some clients are returning null from the call - if(onMessageFuture == null) - { + if (onMessageFuture == null) { onMessageFuture = COMPLETED_FUTURE; } @@ -215,13 +211,10 @@ private void receiveAndPumpMessage() { } else { // Abandon message dispositionPhase = ExceptionPhase.ABANDON; - if(this.messageHandlerOptions.isAutoComplete()) - { + if (this.messageHandlerOptions.isAutoComplete()) { TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber()); updateDispositionFuture = this.innerReceiver.abandonAsync(message.getLockToken()); - } - else - { + } else { updateDispositionFuture = CompletableFuture.completedFuture(null); } } @@ -275,8 +268,7 @@ private void acceptSessionAndPumpMessages() { } else { // Received a session.. Now pump messages.. TRACE_LOGGER.debug("Accepted a session '{}' from entity '{}'", session.getSessionId(), this.entityPath); - if(this.prefetchCount != UNSET_PREFETCH_COUNT) - { + if (this.prefetchCount != UNSET_PREFETCH_COUNT) { try { session.setPrefetchCount(this.prefetchCount); } catch (ServiceBusException e) { @@ -325,11 +317,12 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) { sessionTracker.notifyMessageReceived(); // There is no need to renew message locks as session messages are locked for a day ScheduledFuture renewCancelTimer = Timer.schedule(() -> { - TRACE_LOGGER.warn("onMessage task timed out. Cancelling loop to renew lock on session '{}'", session.getSessionId()); - sessionTracker.sessionRenewLockLoop.cancelLoop(); - }, - this.sessionHandlerOptions.getMaxAutoRenewDuration(), - TimerType.OneTimeRun); + TRACE_LOGGER.warn("onMessage task timed out. Cancelling loop to renew lock on session '{}'", session.getSessionId()); + sessionTracker.sessionRenewLockLoop.cancelLoop(); + }, + this.sessionHandlerOptions.getMaxAutoRenewDuration(), + TimerType.OneTimeRun); + TRACE_LOGGER.debug("Invoking onMessage with message containing sequence number '{}'", message.getSequenceNumber()); CompletableFuture onMessageFuture; try { @@ -341,8 +334,7 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) { } // Some clients are returning null from the call - if(onMessageFuture == null) - { + if (onMessageFuture == null) { onMessageFuture = COMPLETED_FUTURE; } @@ -368,13 +360,10 @@ private void receiveFromSessionAndPumpMessage(SessionTracker sessionTracker) { } else { // Abandon message dispositionPhase = ExceptionPhase.ABANDON; - if (this.sessionHandlerOptions.isAutoComplete()) - { + if (this.sessionHandlerOptions.isAutoComplete()) { TRACE_LOGGER.debug("Abandoning message with sequence number '{}'", message.getSequenceNumber()); updateDispositionFuture = session.abandonAsync(message.getLockToken()); - } - else - { + } else { updateDispositionFuture = CompletableFuture.completedFuture(null); } } @@ -450,7 +439,7 @@ synchronized void notifyMessageReceived() { synchronized CompletableFuture shouldRetryOnNoMessageOrException() { if (this.retryFuture == null || this.retryFuture.isDone()) { - this.retryFuture = new CompletableFuture(); + this.retryFuture = new CompletableFuture<>(); } this.waitingRetryThreads++; if (this.waitingRetryThreads == this.numberReceivingThreads) { @@ -459,23 +448,23 @@ synchronized CompletableFuture shouldRetryOnNoMessageOrException() { // close current session and accept another session ScheduledFuture renewCancelTimer = Timer.schedule(() -> { - TRACE_LOGGER.warn("Closing session timed out. Cancelling loop to renew lock on session '{}'", this.session.getSessionId()); - SessionTracker.this.sessionRenewLockLoop.cancelLoop(); - }, - this.messageAndSessionPump.sessionHandlerOptions.getMaxAutoRenewDuration(), - TimerType.OneTimeRun); + TRACE_LOGGER.warn("Closing session timed out. Cancelling loop to renew lock on session '{}'", this.session.getSessionId()); + SessionTracker.this.sessionRenewLockLoop.cancelLoop(); + }, + this.messageAndSessionPump.sessionHandlerOptions.getMaxAutoRenewDuration(), + TimerType.OneTimeRun); + CompletableFuture onCloseFuture; try { onCloseFuture = COMPLETED_FUTURE.thenComposeAsync((v) -> this.messageAndSessionPump.sessionHandler.OnCloseSessionAsync(session), this.messageAndSessionPump.customCodeExecutor); } catch (Exception onCloseSyncEx) { TRACE_LOGGER.error("Invocation of onCloseSession on session '{}' threw unexpected exception", this.session.getSessionId(), onCloseSyncEx); - onCloseFuture = new CompletableFuture(); + onCloseFuture = new CompletableFuture<>(); onCloseFuture.completeExceptionally(onCloseSyncEx); } // Some clients are returning null from the call - if(onCloseFuture == null) - { + if (onCloseFuture == null) { onCloseFuture = COMPLETED_FUTURE; } @@ -489,8 +478,7 @@ synchronized CompletableFuture shouldRetryOnNoMessageOrException() { this.sessionRenewLockLoop.cancelLoop(); TRACE_LOGGER.debug("Cancelled loop to renew lock on session '{}'", this.session.getSessionId()); - this.session.closeAsync().handleAsync((z, closeEx) -> - { + this.session.closeAsync().handleAsync((z, closeEx) -> { if (closeEx != null) { closeEx = ExceptionUtil.extractAsyncCompletionCause(closeEx); TRACE_LOGGER.info("Closing session '{}' from entity '{}' failed", this.session.getSessionId(), this.messageAndSessionPump.entityPath, closeEx); @@ -583,8 +571,7 @@ protected void loop() { if (renewInterval != null && !renewInterval.isNegative()) { this.timerFuture = Timer.schedule(() -> { TRACE_LOGGER.debug("Renewing lock on '{}'", this.messageIdentifier); - this.innerReceiver.renewMessageLockAsync(message).handleAsync((v, renewLockEx) -> - { + this.innerReceiver.renewMessageLockAsync(message).handleAsync((v, renewLockEx) -> { if (renewLockEx != null) { renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx); TRACE_LOGGER.error("Renewing lock on '{}' failed", this.messageIdentifier, renewLockEx); @@ -638,8 +625,7 @@ protected void loop() { if (renewInterval != null && !renewInterval.isNegative()) { this.timerFuture = Timer.schedule(() -> { TRACE_LOGGER.debug("Renewing lock on '{}'", this.sessionIdentifier); - this.session.renewSessionLockAsync().handleAsync((v, renewLockEx) -> - { + this.session.renewSessionLockAsync().handleAsync((v, renewLockEx) -> { if (renewLockEx != null) { renewLockEx = ExceptionUtil.extractAsyncCompletionCause(renewLockEx); TRACE_LOGGER.error("Renewing lock on '{}' failed", this.sessionIdentifier, renewLockEx); @@ -847,13 +833,13 @@ private void checkInnerReceiveCreated() { // Don't notify handler if the pump is already closed private void notifyExceptionToSessionHandler(Throwable ex, ExceptionPhase phase) { if (!(ex instanceof IllegalStateException && this.getIsClosingOrClosed())) { - this.customCodeExecutor.execute(() -> {this.sessionHandler.notifyException(ex, phase);}); + this.customCodeExecutor.execute(() -> this.sessionHandler.notifyException(ex, phase)); } } private void notifyExceptionToMessageHandler(Throwable ex, ExceptionPhase phase) { if (!(ex instanceof IllegalStateException && this.getIsClosingOrClosed())) { - this.customCodeExecutor.execute(() -> {this.messageHandler.notifyException(ex, phase);}); + this.customCodeExecutor.execute(() -> this.messageHandler.notifyException(ex, phase)); } } @@ -864,27 +850,21 @@ public int getPrefetchCount() { @Override public void setPrefetchCount(int prefetchCount) throws ServiceBusException { - if(prefetchCount < 0) - { + if (prefetchCount < 0) { throw new IllegalArgumentException("Prefetch count cannot be negative."); } this.prefetchCount = prefetchCount; - if(this.innerReceiver != null) - { + if (this.innerReceiver != null) { this.innerReceiver.setPrefetchCount(prefetchCount); } // For accepted session receivers also IMessageSession[] currentAcceptedSessions = this.openSessions.values().toArray(new IMessageSession[0]); - for(IMessageSession session : currentAcceptedSessions) - { - try - { + for (IMessageSession session : currentAcceptedSessions) { + try { session.setPrefetchCount(prefetchCount); - } - catch(IllegalStateException ise) - { + } catch (IllegalStateException ise) { // Session might have been closed.. Ignore the exception as this is a best effort setter on already accepted sessions } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java index 00b955bc0f08..c85b3e744edd 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBody.java @@ -12,7 +12,7 @@ * Client should test for body type before calling corresponding get method. * Get methods not corresponding to the type of the body return null. */ -public class MessageBody implements Serializable{ +public class MessageBody implements Serializable { private static final long serialVersionUID = 7215009530928988502L; @@ -21,17 +21,15 @@ public class MessageBody implements Serializable{ private List> sequenceData; private List binaryData; - private MessageBody() {} + private MessageBody() { } /** * Creates message body of AMQPValue type. * @param value AMQPValue content of the message. It must be of a type supported by AMQP. * @return MessageBody instance wrapping around the value data. */ - public static MessageBody fromValueData(Object value) - { - if(value == null) - { + public static MessageBody fromValueData(Object value) { + if (value == null) { throw new IllegalArgumentException("Value data is null."); } @@ -49,10 +47,8 @@ public static MessageBody fromValueData(Object value) * @param sequenceData a list of AMQPSequence sections. Each AMQPSequence section is in turn a list of objects. Every object in each list must of a type supported by AMQP. * @return MessageBody instance wrapping around the sequence data. */ - public static MessageBody fromSequenceData(List> sequenceData) - { - if(sequenceData == null || sequenceData.size() == 0 || sequenceData.size() > 1) - { + public static MessageBody fromSequenceData(List> sequenceData) { + if (sequenceData == null || sequenceData.size() == 0 || sequenceData.size() > 1) { throw new IllegalArgumentException("Sequence data is null or has more than one collection in it."); } @@ -70,10 +66,8 @@ public static MessageBody fromSequenceData(List> sequenceData) * @param binaryData a list of byte arrays. * @return MessageBody instance wrapping around the binary data. */ - public static MessageBody fromBinaryData(List binaryData) - { - if(binaryData == null || binaryData.size() == 0 || binaryData.size() > 1) - { + public static MessageBody fromBinaryData(List binaryData) { + if (binaryData == null || binaryData.size() == 0 || binaryData.size() > 1) { throw new IllegalArgumentException("Binary data is null or has more than one byte array in it."); } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBrowser.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBrowser.java index 0f748d645da5..bba1c750e2a1 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBrowser.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageBrowser.java @@ -21,12 +21,12 @@ final class MessageBrowser implements IMessageBrowser { private MessageReceiver messageReceiver = null; private MessageSender messageSender = null; - public MessageBrowser(MessageReceiver messageReceiver) { + MessageBrowser(MessageReceiver messageReceiver) { this.messageReceiver = messageReceiver; this.isReceiveSideBrowser = true; } - public MessageBrowser(MessageSender messageSender) { + MessageBrowser(MessageSender messageSender) { this.messageSender = messageSender; this.isReceiveSideBrowser = false; } @@ -58,8 +58,7 @@ public CompletableFuture peekAsync() { @Override public CompletableFuture peekAsync(long fromSequenceNumber) { - return this.peekBatchAsync(fromSequenceNumber, 1).thenApplyAsync((c) -> - { + return this.peekBatchAsync(fromSequenceNumber, 1).thenApplyAsync((c) -> { IMessage message = null; Iterator iterator = c.iterator(); if (iterator.hasNext()) { @@ -87,8 +86,7 @@ public CompletableFuture> peekBatchAsync(long fromSequenceN peekFuture = this.messageSender.getInternalSender().peekMessagesAsync(fromSequenceNumber, messageCount); } - return peekFuture.thenApplyAsync((peekedMessages) -> - { + return peekFuture.thenApplyAsync((peekedMessages) -> { ArrayList convertedMessages = new ArrayList(); if (peekedMessages != null) { TRACE_LOGGER.debug("Browsing messages from sequence number '{}' returned '{}' messages", fromSequenceNumber, peekedMessages.size()); diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java index 63aeca2d281c..06e4d1a67823 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageConverter.java @@ -13,8 +13,13 @@ import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.*; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; import com.microsoft.azure.servicebus.primitives.ClientConstants; import com.microsoft.azure.servicebus.primitives.MessageWithDeliveryTag; @@ -92,13 +97,13 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton Section body = amqpMessage.getBody(); if (body != null) { if (body instanceof Data) { - Binary messageData = ((Data)body).getValue(); + Binary messageData = ((Data) body).getValue(); brokeredMessage = new Message(Utils.fromBinay(messageData.getArray())); } else if (body instanceof AmqpValue) { - Object messageData = ((AmqpValue)body).getValue(); + Object messageData = ((AmqpValue) body).getValue(); brokeredMessage = new Message(MessageBody.fromValueData(messageData)); } else if (body instanceof AmqpSequence) { - List messageData = ((AmqpSequence)body).getValue(); + List messageData = ((AmqpSequence) body).getValue(); brokeredMessage = new Message(Utils.fromSequence(messageData)); } else { // Should never happen @@ -149,25 +154,25 @@ public static Message convertAmqpMessageToBrokeredMessage(org.apache.qpid.proton String entryName = entry.getKey().toString(); switch (entryName) { case ClientConstants.ENQUEUEDTIMEUTCNAME: - brokeredMessage.setEnqueuedTimeUtc(((Date)entry.getValue()).toInstant()); + brokeredMessage.setEnqueuedTimeUtc(((Date) entry.getValue()).toInstant()); break; case ClientConstants.SCHEDULEDENQUEUETIMENAME: - brokeredMessage.setScheduledEnqueueTimeUtc(((Date)entry.getValue()).toInstant()); + brokeredMessage.setScheduledEnqueueTimeUtc(((Date) entry.getValue()).toInstant()); break; case ClientConstants.SEQUENCENUBMERNAME: - brokeredMessage.setSequenceNumber((long)entry.getValue()); + brokeredMessage.setSequenceNumber((long) entry.getValue()); break; case ClientConstants.LOCKEDUNTILNAME: - brokeredMessage.setLockedUntilUtc(((Date)entry.getValue()).toInstant()); + brokeredMessage.setLockedUntilUtc(((Date) entry.getValue()).toInstant()); break; case ClientConstants.PARTITIONKEYNAME: - brokeredMessage.setPartitionKey((String)entry.getValue()); + brokeredMessage.setPartitionKey((String) entry.getValue()); break; case ClientConstants.VIAPARTITIONKEYNAME: - brokeredMessage.setViaPartitionKey((String)entry.getValue()); + brokeredMessage.setViaPartitionKey((String) entry.getValue()); break; case ClientConstants.DEADLETTERSOURCENAME: - brokeredMessage.setDeadLetterSource((String)entry.getValue()); + brokeredMessage.setDeadLetterSource((String) entry.getValue()); break; default: break; diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageHandlerOptions.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageHandlerOptions.java index 8031cb945ac3..a30c3bccb95e 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageHandlerOptions.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageHandlerOptions.java @@ -86,7 +86,9 @@ public Duration getMaxAutoRenewDuration() { * Gets the time to wait for receiving a message. Defaults to 1 minute. * @return The wait duration for receive calls. */ - public Duration getMessageWaitDuration() { return this.messageWaitDuration; } + public Duration getMessageWaitDuration() { + return this.messageWaitDuration; + } @Override public String toString() { diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java index eb50ba544128..479a0d310744 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/QueueClient.java @@ -53,13 +53,11 @@ public QueueClient(ConnectionStringBuilder amqpConnectionStringBuilder, ReceiveM } } - public QueueClient(String namespace, String queuePath, ClientSettings clientSettings, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException - { + public QueueClient(String namespace, String queuePath, ClientSettings clientSettings, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { this(Util.convertNamespaceToEndPointURI(namespace), queuePath, clientSettings, receiveMode); } - public QueueClient(URI namespaceEndpointURI, String queuePath, ClientSettings clientSettings, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException - { + public QueueClient(URI namespaceEndpointURI, String queuePath, ClientSettings clientSettings, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { this(receiveMode, queuePath); CompletableFuture factoryFuture = MessagingFactory.createFromNamespaceEndpointURIAsyc(namespaceEndpointURI, clientSettings); Utils.completeFuture(factoryFuture.thenComposeAsync((f) -> this.createInternals(f, queuePath, receiveMode), MessagingFactory.INTERNAL_THREAD_POOL)); @@ -90,26 +88,19 @@ private CompletableFuture createInternals(MessagingFactory factory, String return CompletableFuture.allOf(postSessionBrowserFuture, messagePumpInitFuture); } - private CompletableFuture createSenderAsync() - { + private CompletableFuture createSenderAsync() { synchronized (this.senderCreationLock) { - if(this.senderCreationFuture == null) - { - this.senderCreationFuture = new CompletableFuture(); - ClientFactory.createMessageSenderFromEntityPathAsync(this.factory, this.queuePath, MessagingEntityType.QUEUE).handleAsync((sender, ex) -> - { - if(ex == null) - { + if (this.senderCreationFuture == null) { + this.senderCreationFuture = new CompletableFuture<>(); + ClientFactory.createMessageSenderFromEntityPathAsync(this.factory, this.queuePath, MessagingEntityType.QUEUE).handleAsync((sender, ex) -> { + if (ex == null) { this.sender = sender; this.senderCreationFuture.complete(null); - } - else - { + } else { Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex); this.senderCreationFuture.completeExceptionally(cause); // Set it to null so next call will retry sender creation - synchronized (this.senderCreationLock) - { + synchronized (this.senderCreationLock) { this.senderCreationFuture = null; } } @@ -121,20 +112,13 @@ private CompletableFuture createSenderAsync() } } - private CompletableFuture closeSenderAsync() - { - synchronized (this.senderCreationLock) - { - if(this.senderCreationFuture != null) - { - CompletableFuture senderCloseFuture = this.senderCreationFuture.thenComposeAsync((v) -> { - return this.sender.closeAsync(); - }, MessagingFactory.INTERNAL_THREAD_POOL); + private CompletableFuture closeSenderAsync() { + synchronized (this.senderCreationLock) { + if (this.senderCreationFuture != null) { + CompletableFuture senderCloseFuture = this.senderCreationFuture.thenComposeAsync((v) -> this.sender.closeAsync(), MessagingFactory.INTERNAL_THREAD_POOL); this.senderCreationFuture = null; return senderCloseFuture; - } - else - { + } else { return CompletableFuture.completedFuture(null); } } @@ -168,17 +152,13 @@ public void sendBatch(Collection messages, TransactionContex @Override public CompletableFuture sendAsync(IMessage message) { return this.createSenderAsync().thenComposeAsync((v) -> - { - return this.sender.sendAsync(message); - }, MessagingFactory.INTERNAL_THREAD_POOL); + this.sender.sendAsync(message), MessagingFactory.INTERNAL_THREAD_POOL); } @Override public CompletableFuture sendAsync(IMessage message, TransactionContext transaction) { return this.createSenderAsync().thenComposeAsync((v) -> - { - return this.sender.sendAsync(message, transaction); - }); + this.sender.sendAsync(message, transaction)); } @Override @@ -189,9 +169,7 @@ public CompletableFuture sendBatchAsync(Collection mes @Override public CompletableFuture sendBatchAsync(Collection messages, TransactionContext transaction) { return this.createSenderAsync().thenComposeAsync((v) -> - { - return this.sender.sendBatchAsync(messages, transaction); - }, MessagingFactory.INTERNAL_THREAD_POOL); + this.sender.sendBatchAsync(messages, transaction), MessagingFactory.INTERNAL_THREAD_POOL); } @Override @@ -202,17 +180,12 @@ public CompletableFuture scheduleMessageAsync(IMessage message, Instant sc @Override public CompletableFuture scheduleMessageAsync(IMessage message, Instant scheduledEnqueueTimeUtc, TransactionContext transaction) { return this.createSenderAsync().thenComposeAsync((v) -> - { - return this.sender.scheduleMessageAsync(message, scheduledEnqueueTimeUtc, transaction); - }, MessagingFactory.INTERNAL_THREAD_POOL); + this.sender.scheduleMessageAsync(message, scheduledEnqueueTimeUtc, transaction), MessagingFactory.INTERNAL_THREAD_POOL); } @Override public CompletableFuture cancelScheduledMessageAsync(long sequenceNumber) { - return this.createSenderAsync().thenComposeAsync((v) -> - { - return this.sender.cancelScheduledMessageAsync(sequenceNumber); - }, MessagingFactory.INTERNAL_THREAD_POOL); + return this.createSenderAsync().thenComposeAsync((v) -> this.sender.cancelScheduledMessageAsync(sequenceNumber), MessagingFactory.INTERNAL_THREAD_POOL); } @Override diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionBrowser.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionBrowser.java index d67a7ea33dce..17970f17b9aa 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionBrowser.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionBrowser.java @@ -19,7 +19,7 @@ final class SessionBrowser { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(SessionBrowser.class); private static final int PAGESIZE = 100; // .net DateTime.MaxValue need to be passed - private static final Date MAXDATE = new Date(253402300800000l); + private static final Date MAXDATE = new Date(253402300800000L); private final MessagingFactory messagingFactory; private final String entityPath; @@ -43,8 +43,7 @@ public CompletableFuture> getMessageSessionsAsync(Da private CompletableFuture> getMessageSessionsAsync(Date lastUpdatedTime, int lastReceivedSkip, String lastSessionId) { TRACE_LOGGER.debug("Getting '{}' browsable sessions from entity '{}', lastUpdatedTime '{}', lastReceivedSkip '{}', lastSessionId '{}'", PAGESIZE, this.entityPath, lastUpdatedTime, lastReceivedSkip, lastSessionId); - return this.miscRequestResponseHandler.getMessageSessionsAsync(lastUpdatedTime, lastReceivedSkip, PAGESIZE, lastSessionId).thenComposeAsync((p) -> - { + return this.miscRequestResponseHandler.getMessageSessionsAsync(lastUpdatedTime, lastReceivedSkip, PAGESIZE, lastSessionId).thenComposeAsync((p) -> { int newLastReceivedSkip = p.getSecondItem(); String[] sessionIds = p.getFirstItem(); ArrayList sessionsList = new ArrayList<>(); diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionHandlerOptions.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionHandlerOptions.java index d3585c68f7fa..ccfac87ca552 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionHandlerOptions.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SessionHandlerOptions.java @@ -112,7 +112,9 @@ public Duration getMaxAutoRenewDuration() { * Gets the time to wait for receiving a message. Defaults to 1 minute. * @return The wait duration for receive calls. */ - public Duration getMessageWaitDuration() { return this.messageWaitDuration; } + public Duration getMessageWaitDuration() { + return this.messageWaitDuration; + } @Override public String toString() { diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SubscriptionClient.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SubscriptionClient.java index b72e252392ea..3a23ccefe9c2 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SubscriptionClient.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/SubscriptionClient.java @@ -25,8 +25,7 @@ import com.microsoft.azure.servicebus.rules.Filter; import com.microsoft.azure.servicebus.rules.RuleDescription; -public final class SubscriptionClient extends InitializableEntity implements ISubscriptionClient -{ +public final class SubscriptionClient extends InitializableEntity implements ISubscriptionClient { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(SubscriptionClient.class); private static final String SUBSCRIPTIONS_DELIMITER = "/subscriptions/"; private final ReceiveMode receiveMode; @@ -38,31 +37,26 @@ public final class SubscriptionClient extends InitializableEntity implements ISu public static final String DEFAULT_RULE_NAME = "$Default"; - private SubscriptionClient(ReceiveMode receiveMode, String subscriptionPath) - { + private SubscriptionClient(ReceiveMode receiveMode, String subscriptionPath) { super(StringUtil.getShortRandomString()); this.receiveMode = receiveMode; this.subscriptionPath = subscriptionPath; } - public SubscriptionClient(ConnectionStringBuilder amqpConnectionStringBuilder, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException - { + public SubscriptionClient(ConnectionStringBuilder amqpConnectionStringBuilder, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { this(receiveMode, amqpConnectionStringBuilder.getEntityPath()); CompletableFuture factoryFuture = MessagingFactory.createFromConnectionStringBuilderAsync(amqpConnectionStringBuilder); Utils.completeFuture(factoryFuture.thenComposeAsync((f) -> this.createPumpAndBrowserAsync(f), MessagingFactory.INTERNAL_THREAD_POOL)); - if(TRACE_LOGGER.isInfoEnabled()) - { + if (TRACE_LOGGER.isInfoEnabled()) { TRACE_LOGGER.info("Created subscription client to connection string '{}'", amqpConnectionStringBuilder.toLoggableString()); } } - public SubscriptionClient(String namespace, String subscriptionPath, ClientSettings clientSettings, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException - { + public SubscriptionClient(String namespace, String subscriptionPath, ClientSettings clientSettings, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { this(Util.convertNamespaceToEndPointURI(namespace), subscriptionPath, clientSettings, receiveMode); } - public SubscriptionClient(URI namespaceEndpointURI, String subscriptionPath, ClientSettings clientSettings, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException - { + public SubscriptionClient(URI namespaceEndpointURI, String subscriptionPath, ClientSettings clientSettings, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { this(receiveMode, subscriptionPath); CompletableFuture factoryFuture = MessagingFactory.createFromNamespaceEndpointURIAsyc(namespaceEndpointURI, clientSettings); Utils.completeFuture(factoryFuture.thenComposeAsync((f) -> this.createPumpAndBrowserAsync(f), MessagingFactory.INTERNAL_THREAD_POOL)); @@ -71,15 +65,13 @@ public SubscriptionClient(URI namespaceEndpointURI, String subscriptionPath, Cli } } - SubscriptionClient(MessagingFactory factory, String subscriptionPath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException - { + SubscriptionClient(MessagingFactory factory, String subscriptionPath, ReceiveMode receiveMode) throws InterruptedException, ServiceBusException { this(receiveMode, subscriptionPath); Utils.completeFuture(this.createPumpAndBrowserAsync(factory)); TRACE_LOGGER.info("Created subscription client to subscripton '{}'", subscriptionPath); } - private CompletableFuture createPumpAndBrowserAsync(MessagingFactory factory) - { + private CompletableFuture createPumpAndBrowserAsync(MessagingFactory factory) { this.factory = factory; CompletableFuture postSessionBrowserFuture = MiscRequestResponseOperationHandler.create(factory, this.subscriptionPath, MessagingEntityType.SUBSCRIPTION).thenAcceptAsync((msoh) -> { this.miscRequestResponseHandler = msoh; @@ -138,8 +130,7 @@ public Collection getRules() throws ServiceBusException, Interr } @Override - public CompletableFuture> getRulesAsync() - { + public CompletableFuture> getRulesAsync() { // Skip and Top can be used to implement pagination. // In this case, we are trying to fetch all the rules associated with the subscription. int skip = 0, top = Integer.MAX_VALUE; @@ -393,21 +384,18 @@ public void setPrefetchCount(int prefetchCount) throws ServiceBusException { @Override public String getTopicName() { - String entityPath = this.getEntityPath(); - String[] parts = Pattern.compile(SUBSCRIPTIONS_DELIMITER, Pattern.CASE_INSENSITIVE).split(entityPath, 2); - return parts[0]; + String entityPath = this.getEntityPath(); + String[] parts = Pattern.compile(SUBSCRIPTIONS_DELIMITER, Pattern.CASE_INSENSITIVE).split(entityPath, 2); + return parts[0]; } @Override public String getSubscriptionName() { String entityPath = this.getEntityPath(); String[] parts = Pattern.compile(SUBSCRIPTIONS_DELIMITER, Pattern.CASE_INSENSITIVE).split(entityPath, 2); - if(parts.length == 2) - { + if (parts.length == 2) { return parts[1]; - } - else - { + } else { throw new RuntimeException("Invalid entity path in the subscription client."); } }